通过克隆 Hive 元存储管道创建 Unity Catalog 管道

本文介绍了 Databricks REST API 中的 clone a pipeline 请求,以及如何使用它将发布到 Hive 元存储的现有管道复制到发布到 Unity Catalog 的新管道。 调用 clone a pipeline 请求时,它会:

  • 将源代码和配置从现有管道复制到新管道,应用指定的任何配置替代。
  • 使用必要更改更新具体化视图和流式表的定义和引用,以使这些对象由 Unity Catalog 管理。
  • 启动管道更新以迁移管道中任何流式处理表的现有数据和元数据,例如检查点。 这样,这些流式处理表就可以在与原始管道相同的时间点恢复处理。

克隆操作完成后,原始管道和新管道都可以独立运行。

本文包含从 Databricks 笔记本直接或通过 Python 脚本调用 API 请求的示例。

在您开始之前

克隆管道之前,需要满足以下条件:

  • 若要克隆 Hive 元存储管道,管道中定义的表和视图必须将表发布到目标架构。 若要了解如何将目标架构添加到管道,请参阅 配置管道以发布到 Hive 元存储

  • 对要克隆的管道中的 Hive 元存储托管表或视图的引用必须使用目录 (hive_metastore)、架构和表名进行完全限定。 例如,在创建 customers 数据集的以下代码中,表名称参数必须更新为 hive_metastore.sales.customers

    @dlt.table
    def customers():
      return spark.read.table("sales.customers").where(...)
    
  • 克隆操作正在进行时,请勿编辑源 Hive 元存储管道的源代码,包括配置为管道的一部分的笔记本以及存储在 Git 文件夹或工作区文件中的任何模块。

  • 在启动克隆操作时,源 Hive 元数据存储管道不得运行。 如果更新正在运行,请停止更新或等待更新完成。

以下是克隆管道之前的其他重要注意事项:

  • 如果 Hive 元存储管道中的表使用 Python 中的 path 参数或 SQL 中的 LOCATION 指定存储位置,请将 "pipelines.migration.ignoreExplicitPath": "true" 配置传递给克隆请求。 以下说明中包括设置此配置。
  • 如果 Hive 元存储管道包含一个指定了 cloudFiles.schemaLocation 选项值的自动加载程序源,并且在创建 Unity 目录克隆后 Hive 元存储管道将继续运行,则必须在 Hive 元存储管道和克隆的 Unity 目录管道中将 mergeSchema 选项设置为 true。 在克隆之前,将此选项添加到 Hive 元存储管道中,会将该选项复制到新管道。

使用 Databricks REST API 克隆管道

以下示例使用 curl 命令在 Databricks REST API 中调用 clone a pipeline 请求:

curl -X POST \
     --header "Authorization: Bearer <personal-access-token>"  \
     <databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
     --data @clone-pipeline.json

替换为:

  • 使用 Databricks <personal-access-token>
  • 使用 Azure Databricks <databricks-instance>,例如 adb-1234567890123456.7.databricks.azure.cn
  • 使用要克隆的 Hive 元存储管道的唯一标识符的 <pipeline-id>。 可以在 DLT UI 中找到管道 ID。

clone-pipeline.json:

{
  "catalog": "<target-catalog-name>",
  "target": "<target-schema-name>",
  "name": "<new-pipeline-name>"
  "clone_mode": "MIGRATE_TO_UC",
  "configuration": {
    "pipelines.migration.ignoreExplicitPath": "true"
  }
}

替换为:

  • 使用新管道要发布到的 Unity Catalog 中的目录名称的 <target-catalog-name>。 这必须是一个现有目录。
  • <target-schema-name> 如果新的管道应该发布到 Unity Catalog 中的架构,并且与当前架构名称不同,请使用该架构的名称。 此参数是可选的,如果未指定,则使用现有架构名称。
  • <new-pipeline-name> 替换为新管道的可选名称。 如果未指定,则使用源管道名称命名新管道,并追加 [UC]

clone_mode 指定要用于克隆操作的模式。 MIGRATE_TO_UC 是唯一受支持的选项。

使用 configuration 字段指定新管道上的配置。 此处设置的值将替代原始管道中的配置。

clone REST API 请求的响应是新 Unity Catalog 管道的 ID。

从 Databricks 笔记本克隆管道

以下示例从 Python 脚本调用 create a pipeline 请求。 可以使用 Databricks 笔记本运行此脚本:

  1. 为脚本创建新笔记本。 请参阅创建笔记本

  2. 将以下 Python 脚本复制到笔记本的第一个单元中。

  3. 通过替换以下代码更新脚本中的占位符值:

    • 使用 Azure Databricks <databricks-instance>,例如 adb-1234567890123456.7.databricks.azure.cn
    • 使用要克隆的 Hive 元存储管道的唯一标识符的 <pipeline-id>。 可以在 DLT UI 中找到管道 ID。
    • 使用新管道要发布到的 Unity Catalog 中的目录名称的 <target-catalog-name>。 这必须是一个现有目录。
    • <target-schema-name> 如果新的管道应该发布到 Unity Catalog 中的架构,并且与当前架构名称不同,请使用该架构的名称。 此参数是可选的,如果未指定,则使用现有架构名称。
    • <new-pipeline-name> 替换为新管道的可选名称。 如果未指定,则使用源管道名称命名新管道,并追加 [UC]
  4. 运行脚本。 请参阅运行 Databricks 笔记本

    import requests
    
    # Your Databricks workspace URL, with no trailing spaces
    WORKSPACE = "<databricks-instance>"
    
    # The pipeline ID of the Hive metastore pipeline to clone
    SOURCE_PIPELINE_ID = "<pipeline-id>"
    # The target catalog name in Unity Catalog
    TARGET_CATALOG = "<target-catalog-name>"
    # (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
    TARGET_SCHEMA = "<target-schema-name>"
    # (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
    CLONED_PIPELINE_NAME = "<new-pipeline-name>"
    
    # This is the only supported clone mode
    CLONE_MODE = "MIGRATE_TO_UC"
    
    # Specify override configurations
    OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
    
    def get_token():
        ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
        return getattr(ctx, "apiToken")().get()
    
    def check_source_pipeline_exists():
        data = requests.get(
            f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
            headers={"Authorization": f"Bearer {get_token()}"},
        )
    
        assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
    
    def request_pipeline_clone():
        payload = {
          "catalog": TARGET_CATALOG,
          "clone_mode": CLONE_MODE,
        }
        if TARGET_SCHEMA != "":
          payload["target"] = TARGET_SCHEMA
        if CLONED_PIPELINE_NAME != "":
          payload["name"] = CLONED_PIPELINE_NAME
        if OVERRIDE_CONFIGS:
          payload["configuration"] = OVERRIDE_CONFIGS
    
        data = requests.post(
            f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
            headers={"Authorization": f"Bearer {get_token()}"},
            json=payload,
        )
        response = data.json()
        return response
    
    check_source_pipeline_exists()
    request_pipeline_clone()
    

局限性

以下是 DLT clone a pipeline API 请求的限制:

  • 仅支持从配置为使用 Hive 元存储的管道克隆到 Unity Catalog 管道。
  • 只能在从中克隆的管道所在的同一 Azure Databricks 工作区中创建克隆。
  • 要克隆的管道只能包括以下流数据源:
  • 如果要克隆的 Hive 元存储管道使用 Auto Loader 文件通知模式,Databricks 建议在克隆后不要运行该管道。 这是因为运行 Hive 元存储管道会导致从 Unity 目录克隆中删除某些文件通知事件。 如果源 Hive 元存储管道在克隆操作完成后运行,则可以使用具有 cloudFiles.backfillInterval 选项的自动加载程序回填缺失的文件。 若要了解自动加载程序文件通知模式,请参阅 什么是自动加载程序文件通知模式?。 若要了解如何使用自动加载程序回填文件,请参阅 使用 cloudFiles.backfillInterval 触发常规回填,常见自动加载程序选项
  • 在克隆进行期间,两个管道的维护任务会自动暂停。
  • 下面适用于针对克隆的 Unity Catalog 管道中的表的“按时间顺序查看”查询:
    • 如果表版本最初写入到了一个由 Hive 元存储托管的对象,则在查询克隆的 Unity Catalog 对象时未定义使用 timestamp_expression 子句的时间旅行查询。
    • 但是,如果将表版本写入克隆的 Unity Catalog 对象,使用 timestamp_expression 子句的时态查询即可正常工作。
    • 使用 version 子句的时间旅行查询在查询克隆的 Unity Catalog 对象时能够正常工作,即便该版本最初写入到 Hive 元存储托管对象也是如此。
  • 有关将 DLT 与 Unity 目录配合使用时的其他限制,请参阅 Unity 目录管道限制
  • 有关 Unity 目录限制,请参阅 Unity 目录限制