本文介绍如何在管道之间移动流表和物化视图。 移动后,你将流转移到的管道将更新表,而不是原始管道。 这在许多方案中非常有用,包括:
- 将大型管道拆分为较小的管道。
- 在单个更大的管道中合并多个管道。
- 更改管道中某些表的刷新频率。
- 将表从使用旧发布模式的管道移动到默认发布模式。 有关旧版发布模式的详细信息,请参阅 管道的旧版发布模式。 若要查看如何一次性迁移整个管道的发布模式,请参阅 在管道中启用默认发布模式。
要求
以下是在管道之间移动表的要求。
运行
ALTER ...
命令时,必须使用 Databricks Runtime 16.3 或更高版本。源管道和目标管道必须是:
- 在同一工作区中
- 由运行操作的 Azure Databricks 用户帐户或服务主体拥有
目标管道必须使用默认发布模式。 这样,便可以将表发布到多个目录和架构。
或者,两个管道都必须使用传统发布模式,并且在设置中必须具有相同的目录和目标值。 有关旧版发布模式的信息,请参阅 LIVE 架构(旧版)。
注释
此功能不支持使用默认发布模式将管道移动到使用旧发布模式的管道。
在管道之间移动表
以下说明如何将流表或物化视图从一个管道移到另一个管道。
如果源管道正在运行,请将其停止。 等待它完全停止。
从源管道的笔记本或文件中删除表的定义,并将其存储在某个位置以供将来参考。
包括管道正常运行所需的任何支持查询或代码。
在笔记本或 SQL 编辑器中,运行以下命令,将表从源管道重新分配到目标管道:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
该命令分别将
ALTER MATERIALIZED VIEW
和ALTER STREAMING TABLE
用于 Unity Catalog 管理的物化视图和流式表。 若要对 Hive 元数据表执行相同的动作,请使用ALTER TABLE
。例如,如果您想将名为
sales
的流式处理表移动到 ID 为abcd1234-ef56-ab78-cd90-1234efab5678
的管道中,请运行以下命令:ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
注释
pipelineId
必须是有效的管道标识符。 不允许null
值。将表的定义添加到目标管道的笔记本/文件中。
注释
如果源和目标之间的目录或目标架构不同,则复制查询可能不起作用。 定义中的部分限定表可以以不同的方式解析。 移动时可能需要更新定义。
移动已完成。 现在可以运行源管道和目标管道。 目标管道会更新该表。
故障排除
下表描述了在管道之间移动表时可能发生的错误。
错误 | DESCRIPTION |
---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
源管道处于默认发布模式,目标使用 LIVE 架构(旧版)模式。 此操作不受支持。 如果源使用默认发布模式,则目标也必须。 |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
仅支持在 Lakeflow 声明性管道之间移动表。 使用 Databricks SQL 创建的流表和实化视图的管道不受支持。 |
DESTINATION_PIPELINE_NOT_FOUND |
pipelines.pipelineId 必须是有效的管道。
pipelineId 不能为 null。 |
在移动后,表在目标位置无法更新。 | 若要在本例中快速缓解,请按照相同的说明将表移回源管道。 |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
执行移动操作的用户必须同时拥有源管道和目标管道。 |
TABLE_ALREADY_EXISTS |
错误消息中列出的表已存在。 如果管道的支持表已存在,则可能会发生这种情况。 在这种情况下,错误信息中列出的表 DROP 。 |
管道中有多个表的示例
管道可以包含多个表。 你仍然可以在管道之间一次移动一个表。 在此方案中,源管道中有三个表(table_a
、table_b
、table_c
),按顺序相互读取。 我们希望将一个表 table_b
移到另一个管道。
初始源代码管道代码:
import dlt
from pyspark.sql.functions import col
@dlt.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dlt.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
我们通过复制和删除源中的表定义,将table_b
的pipelineId更新后,移动到另一个管道table_b
。
首先,暂停任何调度,并等待源管道和目标管道上的更新完成。 然后修改源管道以删除要移动的表的代码。 更新的源代码管道示例代码变为:
import dlt
from pyspark.sql.functions import col
@dlt.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dlt.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
转到 SQL 编辑器以运行 ALTER pipelineId
命令。
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
接下来,转到目标管道并添加定义 table_b
。 如果管道设置中的默认目录和架构相同,则无需更改代码。
目标管道代码:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
如果默认目录和架构在管道设置中有所不同,则必须使用管道的目录和架构添加完全限定的名称。
例如,目标管道代码可以是:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
为源和目标管道运行(或重新启用计划)。
管道现在不相交。 从table_c
(目前在目标管道中)读取的查询以及从table_b
(在源管道中)读取的table_b
。 在源管道 table_b
上执行触发任务时,该管道不会更新,因为它已不再由源管道负责管理。 源管道将 table_b
视为管道外部的表。 这相当于 在Unity Catalog 中定义一个不受管道管理的 Delta 表的具体化视图读取。
局限性
以下是在管道之间移动表的限制条件。
- 不支持使用 Databricks SQL 创建的具体化视图和流式处理表。
- 不支持专用表或视图。
- 源管道和目标管道必须是管道。 不支持 Null 管道。
- 源管道和目标管道必须位于同一工作区中。
- 运行移动操作的用户必须拥有源管道和目标管道。
- 如果源管道使用默认发布模式,则目标管道还必须使用默认发布模式。 不能使用默认发布模式将表从管道移动到使用 LIVE 架构(旧版)的管道。 请参阅 LIVE 架构(旧版)。
- 如果源管道和目标管道都使用 LIVE 架构(旧架构),那么它们必须在设置中具有相同的
catalog
和target
值。