通过克隆 Hive metastore 管道来创建 Unity Catalog 管道

本文介绍了 Databricks REST API 中的 clone a pipeline 请求,以及如何使用它将发布至 Hive 元数据存储的现有管道复制到发布至 Unity 目录系统的新管道。 调用 clone a pipeline 请求时,它会:

  • 将源代码和配置从现有管道复制到新管道,应用指定的任何配置替代。
  • 更新具体化视图和流式处理表定义及其引用,进行必要的更改以由 Unity Catalog 管理这些对象。
  • 启动管道更新,以迁移管道中任意流式表的现有数据和元数据,如检查点等。 这样,这些流式处理表就可以在与原始管道相同的时间点恢复处理。

克隆作完成后,原始管道和新管道都可以独立运行。

本文包含从 Databricks 笔记本直接或通过 Python 脚本调用 API 请求的示例。

在您开始之前

克隆管道之前,需要满足以下条件:

  • 若要克隆 Hive 元存储管道,管道中定义的表和视图必须将表发布到目标架构。 若要了解如何将目标架构添加到管道,请参阅 配置管道以发布到 Hive 元存储

  • 对要克隆的管道中的 Hive 元存储托管表或视图的引用必须具有目录(hive_metastore)、架构和表名称的完全限定。 例如,在创建 customers 数据集的以下代码中,表名称参数必须更新为 hive_metastore.sales.customers

    @dp.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • 克隆作正在进行时,请勿编辑源 Hive 元存储管道的源代码,包括配置为管道的一部分的笔记本以及存储在 Git 文件夹或工作区文件中的任何模块。

  • 启动克隆操作时,源 Hive metastore 管道不得运行。 如果更新正在运行,请停止更新或等待更新完成。

以下是克隆管道之前的其他重要注意事项:

  • 如果 Hive 元存储管道中的表使用 path Python 或 LOCATION SQL 中的参数指定存储位置,请将 "pipelines.migration.ignoreExplicitPath": "true" 配置传递给克隆请求。 以下说明中包括设置此配置。
  • 如果 Hive 元存储管道包含一个自动加载程序源,其中指定了 cloudFiles.schemaLocation 选项的值,并且在创建 Unity 目录克隆后 Hive 元存储管道将保持正常运行,则必须在 Hive 元存储管道和克隆后的 Unity 目录管道中,将 mergeSchema 选项设置为 true。 在克隆之前,将此选项添加到 Hive 元存储管道中将会把该选项复制到新的管道中。

使用 Databricks REST API 克隆管道

以下示例使用 curl 命令在 Databricks REST API 中调用 clone a pipeline 请求:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

替换为:

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

替换为:

  • <target-catalog-name> 包含新管道应发布到的 Unity 目录中的目录的名称。 这必须是现有目录。
  • <target-schema-name> 如果新管道的发布目标与当前架构名称不同,则应标记为 Unity Catalog 中的另一架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。
  • <new-pipeline-name> 具有新管道的可选名称。 如果未指定,则使用追加的源管道名称 [UC] 命名新管道。

clone_mode 指定要用于克隆作的模式。 MIGRATE_TO_UC 是唯一支持的选项。

使用 configuration 字段在新管道上指定配置。 此处设置的值将替代原始管道中的配置。

来自 REST API 请求的响应 clone 是新 Unity Catalog 管道的管道 ID。

从 Databricks 笔记本克隆管道

以下示例从 Python 脚本调用 create a pipeline 请求。 可以使用 Databricks 笔记本运行此脚本:

  1. 为脚本创建新笔记本。 请参阅创建笔记本

  2. 将以下 Python 脚本复制到笔记本的第一个单元中。

  3. 通过替换以下代码更新脚本中的占位符值:

    • <databricks-instance> 使用 Azure Databricks 工作区实例名称,例如 adb-1234567890123456.7.databricks.azure.cn
    • <pipeline-id> 包含要克隆的 Hive 元存储管道的唯一标识符。 可以在 Lakeflow 声明性管道 UI 中找到管道 ID。
    • <target-catalog-name> 包含新管道应发布到的 Unity 目录中的目录的名称。 这必须是现有目录。
    • <target-schema-name> 如果新管道的发布目标与当前架构名称不同,则应标记为 Unity Catalog 中的另一架构名称。 此参数是可选的,如果未指定,则使用现有架构名称。
    • <new-pipeline-name> 具有新管道的可选名称。 如果未指定,则使用追加的源管道名称 [UC] 命名新管道。
  4. 运行脚本。 请参阅 “运行 Databricks 笔记本”。

    import requests
    
    # Your Databricks workspace URL, with no trailing spaces
    WORKSPACE = "<databricks-instance>"
    
    # The pipeline ID of the Hive metastore pipeline to clone
    SOURCE_PIPELINE_ID = "<pipeline-id>"
    # The target catalog name in Unity Catalog
    TARGET_CATALOG = "<target-catalog-name>"
    # (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
    TARGET_SCHEMA = "<target-schema-name>"
    # (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
    CLONED_PIPELINE_NAME = "<new-pipeline-name>"
    
    # This is the only supported clone mode
    CLONE_MODE = "MIGRATE_TO_UC"
    
    # Specify override configurations
    OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
    
    def get_token():
        ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
        return getattr(ctx, "apiToken")().get()
    
    def check_source_pipeline_exists():
        data = requests.get(
            f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
            headers={"Authorization": f"Bearer {get_token()}"},
        )
    
        assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
    
    def request_pipeline_clone():
        payload = {
          "catalog": TARGET_CATALOG,
          "clone_mode": CLONE_MODE,
        }
        if TARGET_SCHEMA != "":
          payload["target"] = TARGET_SCHEMA
        if CLONED_PIPELINE_NAME != "":
          payload["name"] = CLONED_PIPELINE_NAME
        if OVERRIDE_CONFIGS:
          payload["configuration"] = OVERRIDE_CONFIGS
    
        data = requests.post(
            f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
            headers={"Authorization": f"Bearer {get_token()}"},
            json=payload,
        )
        response = data.json()
        return response
    
    check_source_pipeline_exists()
    request_pipeline_clone()
    

局限性

下面是 Lakeflow 声明性管道 clone a pipeline API 请求的限制:

  • 仅支持从配置为使用 Hive 元数据存储的管道克隆到 Unity Catalog 管道。
  • 您只能在您正在克隆的管道所属的同一个 Azure Databricks 工作区中创建克隆。
  • 要克隆的管道只能包括以下流媒体源:
  • Databricks 建议如果要克隆的 Hive 元存储管道使用 Auto Loader 文件通知模式,则在克隆后不要运行 Hive 元存储管道。 这是因为运行 Hive 元存储管道会导致从 Unity 目录克隆中删除某些文件通知事件。 如果源 Hive 元存储管道在克隆操作完成后运行,则可以使用自动加载器配合 cloudFiles.backfillInterval 选项回填缺失的文件。 若要了解自动加载程序文件通知模式,请参阅 在文件通知模式下配置自动加载程序流。 若要了解如何使用自动加载程序回填文件,请参阅 使用 cloudFiles.backfillIntervalCommon Auto Loader 选项触发常规回填。
  • 当克隆正在进行时,两个管道的维护任务会自动暂停。
  • 在克隆版 Unity Catalog 管道中的表上执行时间范围查询如下:
    • 如果表版本最初被写入到由 Hive Metastore 管理的对象中,那么当查询克隆的 Unity Catalog 对象时,使用 timestamp_expression 子句的时光旅行查询将是未定义的。
    • 但是,如果表版本已写入克隆的 Unity 目录对象,则使用 timestamp_expression 子句的时间旅行查询能够正常运行。
    • 使用 version 子句的时间旅行查询在查询克隆的 Unity Catalog 对象时正常工作,即使其版本最初被写入 Hive 元存储托管对象也是如此。
  • 有关将 Lakeflow 声明性管道与 Unity 目录配合使用时的其他限制,请参阅 Unity 目录管道限制
  • 有关 Unity 目录限制,请参阅 Unity 目录限制