本文介绍了 Databricks REST API 中的 clone a pipeline
请求,以及如何使用它将发布至 Hive 元数据存储的现有管道复制到发布至 Unity 目录系统的新管道。 调用 clone a pipeline
请求时,它会:
- 将源代码和配置从现有管道复制到新管道,应用指定的任何配置替代。
- 更新具体化视图和流式处理表定义及其引用,进行必要的更改以由 Unity Catalog 管理这些对象。
- 启动管道更新,以迁移管道中任意流式表的现有数据和元数据,如检查点等。 这样,这些流式处理表就可以在与原始管道相同的时间点恢复处理。
克隆作完成后,原始管道和新管道都可以独立运行。
本文包含从 Databricks 笔记本直接或通过 Python 脚本调用 API 请求的示例。
在您开始之前
克隆管道之前,需要满足以下条件:
若要克隆 Hive 元存储管道,管道中定义的表和视图必须将表发布到目标架构。 若要了解如何将目标架构添加到管道,请参阅 配置管道以发布到 Hive 元存储。
对要克隆的管道中的 Hive 元存储托管表或视图的引用必须具有目录(
hive_metastore
)、架构和表名称的完全限定。 例如,在创建customers
数据集的以下代码中,表名称参数必须更新为hive_metastore.sales.customers
:@dp.table def customers(): return spark.read.table("sales.customers").where(...)
克隆作正在进行时,请勿编辑源 Hive 元存储管道的源代码,包括配置为管道的一部分的笔记本以及存储在 Git 文件夹或工作区文件中的任何模块。
启动克隆操作时,源 Hive metastore 管道不得运行。 如果更新正在运行,请停止更新或等待更新完成。
以下是克隆管道之前的其他重要注意事项:
- 如果 Hive 元存储管道中的表使用
path
Python 或LOCATION
SQL 中的参数指定存储位置,请将"pipelines.migration.ignoreExplicitPath": "true"
配置传递给克隆请求。 以下说明中包括设置此配置。 - 如果 Hive 元存储管道包含一个自动加载程序源,其中指定了
cloudFiles.schemaLocation
选项的值,并且在创建 Unity 目录克隆后 Hive 元存储管道将保持正常运行,则必须在 Hive 元存储管道和克隆后的 Unity 目录管道中,将mergeSchema
选项设置为true
。 在克隆之前,将此选项添加到 Hive 元存储管道中将会把该选项复制到新的管道中。
使用 Databricks REST API 克隆管道
以下示例使用 curl
命令在 Databricks REST API 中调用 clone a pipeline
请求:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
替换为:
-
<personal-access-token>
使用 Databricks 个人访问令牌。 -
<databricks-instance>
使用 Azure Databricks 工作区实例名称,例如adb-1234567890123456.7.databricks.azure.cn
-
<pipeline-id>
包含要克隆的 Hive 元存储管道的唯一标识符。 可以在 Lakeflow 声明性管道 UI 中找到管道 ID。
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
替换为:
-
<target-catalog-name>
包含新管道应发布到的 Unity 目录中的目录的名称。 这必须是现有目录。 -
<target-schema-name>
如果新管道的发布目标与当前架构名称不同,则应标记为 Unity Catalog 中的另一架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>
具有新管道的可选名称。 如果未指定,则使用追加的源管道名称[UC]
命名新管道。
clone_mode
指定要用于克隆作的模式。
MIGRATE_TO_UC
是唯一支持的选项。
使用 configuration
字段在新管道上指定配置。 此处设置的值将替代原始管道中的配置。
来自 REST API 请求的响应 clone
是新 Unity Catalog 管道的管道 ID。
从 Databricks 笔记本克隆管道
以下示例从 Python 脚本调用 create a pipeline
请求。 可以使用 Databricks 笔记本运行此脚本:
为脚本创建新笔记本。 请参阅创建笔记本。
将以下 Python 脚本复制到笔记本的第一个单元中。
通过替换以下代码更新脚本中的占位符值:
-
<databricks-instance>
使用 Azure Databricks 工作区实例名称,例如adb-1234567890123456.7.databricks.azure.cn
-
<pipeline-id>
包含要克隆的 Hive 元存储管道的唯一标识符。 可以在 Lakeflow 声明性管道 UI 中找到管道 ID。 -
<target-catalog-name>
包含新管道应发布到的 Unity 目录中的目录的名称。 这必须是现有目录。 -
<target-schema-name>
如果新管道的发布目标与当前架构名称不同,则应标记为 Unity Catalog 中的另一架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>
具有新管道的可选名称。 如果未指定,则使用追加的源管道名称[UC]
命名新管道。
-
运行脚本。 请参阅 “运行 Databricks 笔记本”。
import requests # Your Databricks workspace URL, with no trailing spaces WORKSPACE = "<databricks-instance>" # The pipeline ID of the Hive metastore pipeline to clone SOURCE_PIPELINE_ID = "<pipeline-id>" # The target catalog name in Unity Catalog TARGET_CATALOG = "<target-catalog-name>" # (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used TARGET_SCHEMA = "<target-schema-name>" # (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]" CLONED_PIPELINE_NAME = "<new-pipeline-name>" # This is the only supported clone mode CLONE_MODE = "MIGRATE_TO_UC" # Specify override configurations OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"} def get_token(): ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext() return getattr(ctx, "apiToken")().get() def check_source_pipeline_exists(): data = requests.get( f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}", headers={"Authorization": f"Bearer {get_token()}"}, ) assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!" def request_pipeline_clone(): payload = { "catalog": TARGET_CATALOG, "clone_mode": CLONE_MODE, } if TARGET_SCHEMA != "": payload["target"] = TARGET_SCHEMA if CLONED_PIPELINE_NAME != "": payload["name"] = CLONED_PIPELINE_NAME if OVERRIDE_CONFIGS: payload["configuration"] = OVERRIDE_CONFIGS data = requests.post( f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone", headers={"Authorization": f"Bearer {get_token()}"}, json=payload, ) response = data.json() return response check_source_pipeline_exists() request_pipeline_clone()
局限性
下面是 Lakeflow 声明性管道 clone a pipeline
API 请求的限制:
- 仅支持从配置为使用 Hive 元数据存储的管道克隆到 Unity Catalog 管道。
- 您只能在您正在克隆的管道所属的同一个 Azure Databricks 工作区中创建克隆。
- 要克隆的管道只能包括以下流媒体源:
- 增量源
- 自动加载程序,包括自动加载程序支持的任何数据源。 请参阅 从云对象存储加载文件。
- Apached Kafka 与结构化流式处理。 但是,无法将 Kafka 源配置为使用
kafka.group.id
该选项。 请参阅使用 Apache Kafka 和 Azure Databricks 进行流处理。 - Amazon Kinesis 结合结构化流式处理。 但是,无法将 Kinesis 源配置为
consumerMode
或efo
。
- Databricks 建议如果要克隆的 Hive 元存储管道使用 Auto Loader 文件通知模式,则在克隆后不要运行 Hive 元存储管道。 这是因为运行 Hive 元存储管道会导致从 Unity 目录克隆中删除某些文件通知事件。 如果源 Hive 元存储管道在克隆操作完成后运行,则可以使用自动加载器配合
cloudFiles.backfillInterval
选项回填缺失的文件。 若要了解自动加载程序文件通知模式,请参阅 在文件通知模式下配置自动加载程序流。 若要了解如何使用自动加载程序回填文件,请参阅 使用 cloudFiles.backfillInterval 和 Common Auto Loader 选项触发常规回填。 - 当克隆正在进行时,两个管道的维护任务会自动暂停。
- 在克隆版 Unity Catalog 管道中的表上执行时间范围查询如下:
- 如果表版本最初被写入到由 Hive Metastore 管理的对象中,那么当查询克隆的 Unity Catalog 对象时,使用
timestamp_expression
子句的时光旅行查询将是未定义的。 - 但是,如果表版本已写入克隆的 Unity 目录对象,则使用
timestamp_expression
子句的时间旅行查询能够正常运行。 - 使用
version
子句的时间旅行查询在查询克隆的 Unity Catalog 对象时正常工作,即使其版本最初被写入 Hive 元存储托管对象也是如此。
- 如果表版本最初被写入到由 Hive Metastore 管理的对象中,那么当查询克隆的 Unity Catalog 对象时,使用
- 有关将 Lakeflow 声明性管道与 Unity 目录配合使用时的其他限制,请参阅 Unity 目录管道限制。
- 有关 Unity 目录限制,请参阅 Unity 目录限制。