配置增量实时表的管道设置
本文提供有关配置增量实时表的管道设置的详细信息。 增量实时表具有用于配置和编辑管道设置的用户界面。 该 UI 还具有一个选项用于显示和编辑 JSON 设置。
注意
可以使用 UI 或 JSON 规范配置大多数设置。 某些高级选项仅在使用 JSON 配置时才可用。
Databricks 建议使用 UI 来熟悉增量实时表设置。 如有必要,可以直接在工作区中编辑 JSON 配置。 将管道部署到新环境或使用 CLI 或REST API时,JSON 配置文件也很有用。
有关增量实时表 JSON 配置设置的完整参考,请参阅增量实时表管道配置。
选择产品版本
选择具有最适合你的管道要求的功能的增量实时表产品版本。 可以使用以下产品版本:
Core
,用于运行流式处理引入工作负载。 如果你的管道不需要变更数据捕获 (CDC) 或增量实时表期望等高级功能,请选择Core
版本。Pro
,用于运行流式处理引入和 CDC 工作负载。Pro
产品版本支持所有Core
功能,此外还支持需要根据源数据的更改更新表的工作负载。Advanced
,用于运行流式处理引入工作负载、CDC 工作负载,以及需要“期望”功能的工作负载。Advanced
产品版本支持Core
和Pro
版本的功能,此外还支持通过增量实时表期望强制实施数据质量约束。
在创建或编辑管道时可以选择产品版本。 可为每个管道选择不同的版本。 请参阅增量实时表产品页。
注意
如果管道包含所选产品版本不支持的功能(例如期望),你将收到解释错误原因的错误消息。 然后,可以编辑该管道以选择适当的版本。
选择管道模式
可以选择根据管道模式连续或使用手动触发器更新管道。 请参阅连续与触发管道执行。
选择群集策略
用户必须有权部署计算才能配置和更新增量实时表管道。 工作区管理员可以配置群集策略,以便为用户提供对增量实时表计算资源的访问权限。 请参阅定义 Delta Live Tables 管道计算的限制。
注意
群集策略是可选的。 如果你缺少增量实时表所需的计算特权,请咨询工作区管理员。
若要确保正确应用群集策略默认值,请在管道配置的群集配置中将
apply_policy_default_values
值设置为true
:{ "clusters": [ { "label": "default", "policy_id": "<policy-id>", "apply_policy_default_values": true } ] }
配置源代码库
可以使用增量实时表 UI 中的文件选择器来配置用于定义管道的源代码。 管道源代码在 Databricks 笔记本中定义,或者在工作区文件所存储的 SQL 或 Python 脚本中定义。 创建或编辑管道时,可以添加一个或多个笔记本或工作区文件,或者添加笔记本和工作区文件的组合。
由于增量实时表会自动分析数据集依赖项来构造管道的处理图,因此你可以按任意顺序添加源代码库。
还可以修改 JSON 文件,以包含工作区文件中存储的 SQL 和 Python 脚本中定义的增量实时表源代码。 以下示例包含笔记本和工作区文件:
{
"name": "Example pipeline 3",
"storage": "dbfs:/pipeline-examples/storage-location/example3",
"libraries": [
{ "notebook": { "path": "/example-notebook_1" } },
{ "notebook": { "path": "/example-notebook_2" } },
{ "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
{ "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
]
}
指定存储位置
可以为一个将内容发布到 Hive 元存储的管道指定存储位置。 指定位置的主要目的是控制管道写入的数据的对象存储位置。
由于 Delta Live Tables 管道的所有表、数据、检查点和元数据完全由 Delta Live Tables 管理,因此与 Delta Live Tables 数据集的大多数交互都是通过注册到 Hive 元存储或 Unity Catalog 的表进行的。
指定管道输出表的目标架构
虽然此操作是可选的,但每当你的目的不仅仅是开发和测试新管道时,都应该指定一个目标,以便发布管道所创建的表。 将管道发布到目标后,便可以在 Azure Databricks 环境中的其他位置查询数据集。 请参阅将数据从 Delta Live Tables 发布到 Hive 元存储或将 Unity Catalog 与 Delta Live Tables 管道配合使用。
配置计算设置
每个增量实时表管道有两个关联的群集:
updates
群集处理管道更新。maintenance
群集运行日常维护任务。
这些群集使用的配置由管道设置中指定的 clusters
属性确定。
可以使用群集标签添加仅适用于特定群集类型的计算设置。 配置管道群集时可以使用三个标签:
注意
如果只定义一个群集配置,则可以省略群集标签设置。 如果未提供标签的设置,则 default
标签将应用于群集配置。 仅当需要为不同的群集类型自定义设置时,才需要群集标签设置。
default
标签定义要应用于updates
和maintenance
群集的计算设置。 将相同的设置应用于这两个群集可确保将所需的配置(例如存储位置的数据访问凭据)应用于维护群集,从而提高维护运行的可靠性。maintenance
标签定义要仅应用于maintenance
群集的计算设置。 还可以使用maintenance
标签替代由default
标签配置的设置。updates
标签定义要仅应用于updates
群集的设置。 使用updates
标签配置不应应用于maintenance
群集的设置。
使用 default
和 updates
标签定义的设置将合并为 updates
群集创建最终配置。 如果使用 default
和 updates
标签定义相同的设置,则使用 updates
标签定义的设置将替代使用 default
标签定义的设置。
以下示例定义一个 Spark 配置参数,该参数仅添加到 updates
群集的配置中:
{
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
},
{
"label": "updates",
"spark_conf": {
"key": "value"
}
}
]
}
增量实时表的群集设置选项与 Azure Databricks 上的其他计算类似。 与配置其他管道设置一样,你可以修改群集的 JSON 配置以指定不显示在 UI 中的选项。 请参阅计算。
注意
- 由于 Delta Live Tables 运行时会管理管道群集的生命周期并运行自定义版本的 Databricks Runtime,因此你无法在管道配置中手动设置某些群集设置,例如 Spark 版本或群集名称。 请参阅不可由用户设置的群集属性。
- 可以配置增量实时表管道以利用 Photon。 请参阅什么是 Photon?。
选择要运行管道的实例类型
默认情况下,增量实时表为运行管道的驱动程序节点和工作器节点选择实例类型,但你也可以手动配置实例类型。 例如,你可能希望选择实例类型以提高管道性能,或者在运行管道时解决内存问题。 在使用 REST API 或增量实时表 UI 创建或编辑管道时,可以配置实例类型。
若要在增量实时表 UI 中创建或编辑管道时配置实例类型:
- 单击“设置”按钮。
- 在管道设置的“高级”部分中,在“辅助角色类型”和“驱动程序类型”下拉菜单中,选择管道的实例类型。
若要在管道的 JSON 设置中配置实例类型,请单击 JSON 按钮,并在群集配置中输入实例类型配置:
注意
为了避免向 maintenance
群集分配不必要的资源,此示例使用 updates
标签来仅设置 updates
群集的实例类型。 若要将实例类型分配给 updates
和 maintenance
群集,请使用 default
标签或省略标签的设置。 如果未提供标签的设置,则 default
标签将应用于管道群集配置。 请参阅配置计算设置。
{
"clusters": [
{
"label": "updates",
"node_type_id": "Standard_D12_v2",
"driver_node_type_id": "Standard_D3_v2",
"..." : "..."
}
]
}
使用自动缩放来提高效率并减少资源使用量
使用增强型自动缩放来优化管道的群集利用率。 仅当系统确定这些资源可以提高管道处理速度时,增强型自动缩放才会添加额外的资源。 不再需要的资源会被释放;一旦所有管道更新完成,群集就会立即关闭。
若要详细了解增强型自动缩放(包括配置详细信息),请参阅使用增强型自动缩放优化增量实时表管道的群集利用率。
延迟计算关闭
由于 Delta Live Tables 群集在不使用时自动关闭,因此,引用群集配置中设置 autotermination_minutes
的群集策略会导致错误。 要控制群集关闭行为,可以使用开发或生产模式,也可以使用管道配置中的 pipelines.clusterShutdown.delay
设置。 以下示例将 pipelines.clusterShutdown.delay
值设置为 60 秒:
{
"configuration": {
"pipelines.clusterShutdown.delay": "60s"
}
}
如果启用 production
模式,pipelines.clusterShutdown.delay
的默认值为 0 seconds
。 如果启用 development
模式,默认值为 2 hours
。
创建单节点群集
如果在群集设置中将 num_workers
设置为 0,群集将创建为单节点群集。 配置自动缩放群集,并将 min_workers
设置为 0,将 max_workers
设置为 0 也会创建一个单节点群集。
如果配置自动缩放群集并仅将 min_workers
设置为 0,则群集不会创建为单节点群集。 在终止之前,群集将始终至少句有 1 个活动辅助角色。
在增量实时表中创建单节点群集的示例群集配置:
{
"clusters": [
{
"num_workers": 0
}
]
}
配置群集标记
可以使用群集标记来监视管道群集的使用情况。 可以在创建或编辑管道时在增量实时表 UI 中添加群集标记,或者通过编辑管道群集的 JSON 设置来添加标记。
云存储配置
若要访问 Azure 存储,必须使用群集配置中的 spark.conf
设置配置所需的参数(包括访问令牌)。 有关为 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户配置访问权限的示例,请参阅使用管道中的机密安全访问存储凭据。
在 Python 或 SQL 中参数化数据集声明
可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:
- 从代码中分离长路径和其他变量。
- 减少在开发或过渡环境中处理的数据量,以加快测试速度。
- 重用同一转换逻辑来处理多个数据源。
以下示例使用 startDate
配置值将开发管道限制为输入数据的子集:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
管道触发间隔
可使用 pipelines.trigger.interval
控制更新表或整个管道的流的触发间隔。 由于触发管道仅处理每个表一次,因此 pipelines.trigger.interval
仅用于连续管道。
由于流式处理查询与批处理查询的默认值不同,Databricks 建议对各个表设置 pipelines.trigger.interval
。 仅当你的处理需要控制整个管道图的更新时,才对管道设置该值。
使用 Python 中的 spark_conf
或 SQL 中的 SET
对表设置 pipelines.trigger.interval
:
@dlt.table(
spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
return (<query>)
SET pipelines.trigger.interval=10 seconds;
CREATE OR REFRESH MATERIALIZED VIEW TABLE_NAME
AS SELECT ...
要对管道设置 pipelines.trigger.interval
,请将其添加到管道设置中的 configuration
对象:
{
"configuration": {
"pipelines.trigger.interval": "10 seconds"
}
}
允许非管理员用户从启用了 Unity Catalog 的管道查看驱动程序日志
默认情况下,只有管道所有者和工作区管理员有权从运行启用了 Unity Catalog 的管道的群集中查看驱动程序日志。 可以通过在管道设置中将以下 Spark 配置参数添加到 configuration
对象,为任何具有“可管理”、“可查看”或“可运行”权限的用户启用对驱动程序日志的访问权限:
{
"configuration": {
"spark.databricks.acl.needAdminPermissionToViewLogs": "false"
}
}
为管道事件添加电子邮件通知
可在出现以下情况时配置一个或多个电子邮件地址来接收通知:
- 管道更新成功完成。
- 管道更新失败,出现可重试或不可重试错误。 选择此选项将接收有关所有管道故障的通知。
- 管道更新失败并出现不可重试(致命)错误。 选择此选项将仅在发生不可重试错误时接收通知。
- 单个数据流失败。
若要在创建或编辑管道时配置电子邮件通知,请执行以下操作:
- 单击“添加通知”。
- 输入单个或多个电子邮件地址以接收通知。
- 单击要发送到配置的电子邮件地址的每种通知类型的复选框。
- 单击“添加通知”。
SCD 类型 1 查询的控制逻辑删除管理
以下设置可用于控制 SCD 类型 1 处理期间 DELETE
事件的逻辑删除管理行为:
pipelines.applyChanges.tombstoneGCThresholdInSeconds
:设置此值以匹配无序数据之间的最高预期间隔(以秒为单位)。 默认为 172800 秒(2 天)。pipelines.applyChanges.tombstoneGCFrequencyInSeconds
:此设置控制检查逻辑删除的清理频率(以秒为单位)。 默认值为 1800 秒(30 分钟)。
请参阅 APPLY CHANGES API:使用增量实时表简化变更数据捕获。
配置管道权限
你必须对管道具有 CAN MANAGE
或 IS OWNER
权限才能管理管道的权限。
在边栏中,单击“增量实时表”。
选择某个管道的名称。
单击 kebab 菜单 并选择“权限”。
在“权限设置”中,选择“选择用户、组或服务主体...”下拉菜单,然后选择一个用户、组或服务主体。
从“权限”下拉菜单中选择权限。
单击“添加” 。
单击“ 保存”。
为增量实时表启用 RocksDB 状态存储
在部署管道之前,你可以通过设置以下配置来启用基于 RocksDB 的状态管理:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
若要详细了解 RocksDB 状态存储(包括针对 RocksDB 的配置建议),请参阅在 Azure Databricks 上配置 RocksDB 状态存储。