本文介绍了 Databricks REST API 中的 clone a pipeline
请求,以及如何使用它将发布到 Hive 元存储的现有管道复制到发布到 Unity Catalog 的新管道。 调用 clone a pipeline
请求时,它会:
- 将源代码和配置从现有管道复制到新管道,应用指定的任何配置替代。
- 使用必要更改更新具体化视图和流式表的定义和引用,以使这些对象由 Unity Catalog 管理。
- 启动管道更新以迁移管道中任何流式处理表的现有数据和元数据,例如检查点。 这样,这些流式处理表就可以在与原始管道相同的时间点恢复处理。
克隆操作完成后,原始管道和新管道都可以独立运行。
本文包含从 Databricks 笔记本直接或通过 Python 脚本调用 API 请求的示例。
在您开始之前
克隆管道之前,需要满足以下条件:
若要克隆 Hive 元存储管道,管道中定义的表和视图必须将表发布到目标架构。 若要了解如何将目标架构添加到管道,请参阅 配置管道以发布到 Hive 元存储。
对要克隆的管道中的 Hive 元存储托管表或视图的引用必须使用目录 (
hive_metastore
)、架构和表名进行完全限定。 例如,在创建customers
数据集的以下代码中,表名称参数必须更新为hive_metastore.sales.customers
:@dlt.table def customers(): return spark.read.table("sales.customers").where(...)
克隆操作正在进行时,请勿编辑源 Hive 元存储管道的源代码,包括配置为管道的一部分的笔记本以及存储在 Git 文件夹或工作区文件中的任何模块。
在启动克隆操作时,源 Hive 元数据存储管道不得运行。 如果更新正在运行,请停止更新或等待更新完成。
以下是克隆管道之前的其他重要注意事项:
- 如果 Hive 元存储管道中的表使用 Python 中的
path
参数或 SQL 中的LOCATION
指定存储位置,请将"pipelines.migration.ignoreExplicitPath": "true"
配置传递给克隆请求。 以下说明中包括设置此配置。 - 如果 Hive 元存储管道包含一个指定了
cloudFiles.schemaLocation
选项值的自动加载程序源,并且在创建 Unity 目录克隆后 Hive 元存储管道将继续运行,则必须在 Hive 元存储管道和克隆的 Unity 目录管道中将mergeSchema
选项设置为true
。 在克隆之前,将此选项添加到 Hive 元存储管道中,会将该选项复制到新管道。
使用 Databricks REST API 克隆管道
以下示例使用 curl
命令在 Databricks REST API 中调用 clone a pipeline
请求:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
替换为:
- 使用 Databricks
<personal-access-token>
的 - 使用 Azure Databricks
<databricks-instance>
的 ,例如adb-1234567890123456.7.databricks.azure.cn
- 使用要克隆的 Hive 元存储管道的唯一标识符的
<pipeline-id>
。 可以在 DLT UI 中找到管道 ID。
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
替换为:
- 使用新管道要发布到的 Unity Catalog 中的目录名称的
<target-catalog-name>
。 这必须是一个现有目录。 -
<target-schema-name>
如果新的管道应该发布到 Unity Catalog 中的架构,并且与当前架构名称不同,请使用该架构的名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>
替换为新管道的可选名称。 如果未指定,则使用源管道名称命名新管道,并追加[UC]
。
clone_mode
指定要用于克隆操作的模式。
MIGRATE_TO_UC
是唯一受支持的选项。
使用 configuration
字段指定新管道上的配置。 此处设置的值将替代原始管道中的配置。
clone
REST API 请求的响应是新 Unity Catalog 管道的 ID。
从 Databricks 笔记本克隆管道
以下示例从 Python 脚本调用 create a pipeline
请求。 可以使用 Databricks 笔记本运行此脚本:
为脚本创建新笔记本。 请参阅创建笔记本。
将以下 Python 脚本复制到笔记本的第一个单元中。
通过替换以下代码更新脚本中的占位符值:
- 使用 Azure Databricks
<databricks-instance>
的 ,例如adb-1234567890123456.7.databricks.azure.cn
- 使用要克隆的 Hive 元存储管道的唯一标识符的
<pipeline-id>
。 可以在 DLT UI 中找到管道 ID。 - 使用新管道要发布到的 Unity Catalog 中的目录名称的
<target-catalog-name>
。 这必须是一个现有目录。 -
<target-schema-name>
如果新的管道应该发布到 Unity Catalog 中的架构,并且与当前架构名称不同,请使用该架构的名称。 此参数是可选的,如果未指定,则使用现有架构名称。 -
<new-pipeline-name>
替换为新管道的可选名称。 如果未指定,则使用源管道名称命名新管道,并追加[UC]
。
- 使用 Azure Databricks
运行脚本。 请参阅运行 Databricks 笔记本。
import requests # Your Databricks workspace URL, with no trailing spaces WORKSPACE = "<databricks-instance>" # The pipeline ID of the Hive metastore pipeline to clone SOURCE_PIPELINE_ID = "<pipeline-id>" # The target catalog name in Unity Catalog TARGET_CATALOG = "<target-catalog-name>" # (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used TARGET_SCHEMA = "<target-schema-name>" # (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]" CLONED_PIPELINE_NAME = "<new-pipeline-name>" # This is the only supported clone mode CLONE_MODE = "MIGRATE_TO_UC" # Specify override configurations OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"} def get_token(): ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext() return getattr(ctx, "apiToken")().get() def check_source_pipeline_exists(): data = requests.get( f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}", headers={"Authorization": f"Bearer {get_token()}"}, ) assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!" def request_pipeline_clone(): payload = { "catalog": TARGET_CATALOG, "clone_mode": CLONE_MODE, } if TARGET_SCHEMA != "": payload["target"] = TARGET_SCHEMA if CLONED_PIPELINE_NAME != "": payload["name"] = CLONED_PIPELINE_NAME if OVERRIDE_CONFIGS: payload["configuration"] = OVERRIDE_CONFIGS data = requests.post( f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone", headers={"Authorization": f"Bearer {get_token()}"}, json=payload, ) response = data.json() return response check_source_pipeline_exists() request_pipeline_clone()
局限性
以下是 DLT clone a pipeline
API 请求的限制:
- 仅支持从配置为使用 Hive 元存储的管道克隆到 Unity Catalog 管道。
- 只能在从中克隆的管道所在的同一 Azure Databricks 工作区中创建克隆。
- 要克隆的管道只能包括以下流数据源:
- Delta 源
- 自动加载程序,包括自动加载程序支持的任何数据源。 请参阅 从云对象存储加载文件。
- Apache Kafka 结合结构化流式处理。 但是,无法将 Kafka 源配置为使用
kafka.group.id
选项。 请参阅使用 Apache Kafka 和 Azure Databricks 进行流处理。 - Amazon Kinesis 结合结构化流式处理。 但是,无法将 Kinesis 源配置为将
consumerMode
设置为efo
。
- 如果要克隆的 Hive 元存储管道使用 Auto Loader 文件通知模式,Databricks 建议在克隆后不要运行该管道。 这是因为运行 Hive 元存储管道会导致从 Unity 目录克隆中删除某些文件通知事件。 如果源 Hive 元存储管道在克隆操作完成后运行,则可以使用具有
cloudFiles.backfillInterval
选项的自动加载程序回填缺失的文件。 若要了解自动加载程序文件通知模式,请参阅 什么是自动加载程序文件通知模式?。 若要了解如何使用自动加载程序回填文件,请参阅 使用 cloudFiles.backfillInterval 触发常规回填,常见自动加载程序选项。 - 在克隆进行期间,两个管道的维护任务会自动暂停。
- 下面适用于针对克隆的 Unity Catalog 管道中的表的“按时间顺序查看”查询:
- 如果表版本最初写入到了一个由 Hive 元存储托管的对象,则在查询克隆的 Unity Catalog 对象时未定义使用
timestamp_expression
子句的时间旅行查询。 - 但是,如果将表版本写入克隆的 Unity Catalog 对象,使用
timestamp_expression
子句的时态查询即可正常工作。 - 使用
version
子句的时间旅行查询在查询克隆的 Unity Catalog 对象时能够正常工作,即便该版本最初写入到 Hive 元存储托管对象也是如此。
- 如果表版本最初写入到了一个由 Hive 元存储托管的对象,则在查询克隆的 Unity Catalog 对象时未定义使用
- 有关将 DLT 与 Unity 目录配合使用时的其他限制,请参阅 Unity 目录管道限制。
- 有关 Unity 目录限制,请参阅 Unity 目录限制。