Compartir a través de

使用 Azure 数据工厂 或 Azure Synapse Analytics 将数据复制到 Azure Databricks Delta Lake 或从中复制数据。

适用于: Azure 数据工厂 Azure Synapse Analytics

本文概述了如何在 Azure 数据工厂 和 Azure Synapse 中使用复制活动,将数据复制到 Azure Databricks Delta Lake 和从中复制数据。 本文基于复制活动一文,该文章概述了复制活动的总体情况。

支持的功能

以下功能支持此Azure Databricks Delta Lake 连接器:

支持的功能 IR
复制活动 (源/接收器) (1) (2)
映射数据流(源/汇)
查询活动 (1) (2)

(1) Azure集成运行时 (2) 自承载集成运行时

通常,该服务支持使用具有以下功能的 Delta Lake 来满足你的各种需求。

  • 复制活动支持 Azure Databricks Delta Lake 连接器将数据从任何支持的源数据存储复制到 Azure Databricks Delta Lake 表,以及从 Delta Lake 表复制到任何支持的目标数据存储。 它利用 Databricks 群集执行数据移动,详见“先决条件”部分
  • Mapping 数据流 支持在 Azure 存储 上以泛型 Delta 格式 作为源和接收器读取和写入 Delta 文件,实现无代码 ETL,并在托管的 Azure Integration Runtime 上运行。
  • Databricks 活动支持在 Delta Lake 之上协调以代码为中心的 ETL 或机器学习工作负荷。

先决条件

若要使用此Azure Databricks Delta Lake 连接器,需要在 Azure Databricks 中设置群集。

  • 要将数据复制到 Delta Lake,Copy Activity 调用 Azure Databricks 群集从 Azure 存储 读取数据。Azure 存储 可以是您的原始数据源或一个暂存区域;服务首先通过内置暂存复制将源数据写入该区域。 从 Delta Lake 作为接收器中了解详细信息。
  • 同样,为了从Delta Lake复制数据,复制活动 会调用 Azure Databricks 集群,将数据写入 Azure 存储,这既可以是原始接收器,也可以是服务用来通过内置暂存复制将数据继续写入最终接收器的暂存区域。 从 Delta Lake 作为来源中了解更多。

Databricks 群集需要有权访问 Azure Blob 或 Azure Data Lake Storage Gen2 帐户,其中存储容器/文件系统用作源/接收器/暂存,还有用于写入 Delta Lake 表的容器/文件系统。

  • 若要使用 Azure Data Lake Storage Gen2,可以在 Databricks 群集上配置 service principal 作为 Apache Spark 配置的一部分。 请遵循直接使用服务主体访问中的步骤。

  • 若要使用 Azure Blob 存储,可以在 Databricks 群集上配置 storage 帐户访问密钥SAS 令牌作为 Apache Spark 配置的一部分。 请按照 使用 RDD API 访问 Azure Blob 存储中的步骤进行操作。

在复制活动执行期间,如果你配置的群集已终止,则该服务会自动启动它。 如果你使用创作 UI 来创作管道,则对于数据预览之类的操作,你需要有一个实时群集,该服务不会代表你启动群集。

指定群集配置

  1. 在“群集模式”下拉列表中,选择“标准” 。

  2. 在“Databricks Runtime 版本”下拉菜单中,选择一个 Databricks 运行时版本。

  3. Spark 配置中添加以下属性以打开自动优化

    spark.databricks.delta.optimizeWrite.enabled true
    spark.databricks.delta.autoCompact.enabled true
    
  4. 根据集成和缩放需求配置群集。

有关群集配置的详细信息,请参阅配置群集

开始

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:

使用 UI 创建指向 Azure Databricks Delta Lake 的链接服务

使用以下步骤在 Azure 门户 UI 中创建一个链接服务,以连接 Azure Databricks Delta Lake。

  1. 浏览到Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:

  2. 搜索Delta,选择Azure Databricks Delta Lake连接器。

    Azure Databricks Delta Lake 连接器的屏幕截图。

  3. 配置服务详细信息、测试连接并创建新的链接服务。

    Azure Databricks Delta Lake 关联服务配置的截图。

连接器配置详细信息

以下部分提供有关定义特定于 Azure Databricks Delta Lake 连接器的实体的属性的详细信息。

连接的服务属性

此Azure Databricks Delta Lake 连接器支持以下身份验证类型。 有关详细信息,请参阅相应部分。

访问令牌

Azure Databricks Delta Lake 链接服务支持以下属性:

属性 描述 必需
类型 type 属性必须设置为 AzureDatabricksDeltaLake
指定Azure Databricks工作区 URL,例如 https://adb-xxxxxxxxx.xx.databricks.azure.cn
群集ID 指定现有群集的群集 ID。 该群集应该是已创建的交互式群集。
可以通过 Databricks 工作区 ->“群集”->“交互式群集名称”->“配置”->“标记”找到交互式群集的群集 ID。 了解详细信息
accessToken 服务需要访问令牌才能对Azure Databricks进行身份验证。 需从 Databricks 工作区生成访问令牌。 此处提供了查找访问令牌的更多详细步骤。
connectVia 用于连接到数据存储的集成运行时。 可以使用Azure集成运行时或自承载集成运行时(如果数据存储位于专用网络中)。 如果未指定,则使用默认Azure集成运行时。

示例:

{
    "name": "AzureDatabricksDeltaLakeLinkedService",
    "properties": {
        "type": "AzureDatabricksDeltaLake",
        "typeProperties": {
            "domain": "https://adb-xxxxxxxxx.xx.databricks.azure.cn",
            "clusterId": "<cluster id>",
            "accessToken": {
                "type": "SecureString", 
                "value": "<access token>"
          	}
        }
    }
}

系统分配的托管身份验证

若要详细了解针对 Azure 资源的系统分配托管标识,请参阅系统分配托管标识的 Azure 资源

若要使用系统分配的托管标识身份验证,请按照以下步骤授予权限:

  1. 通过复制随数据工厂或 Synapse 工作区生成的“托管标识对象 ID”值,检索托管标识信息

  2. 在 Azure Databricks 中,需为托管标识授予正确的权限。 一般情况下,必须在 Azure Databricks 的 访问控制 (IAM) 中向系统分配的托管标识至少授予 Contributor 角色。

Azure Databricks Delta Lake 链接服务支持以下属性:

属性 描述 必需
类型 type 属性必须设置为 AzureDatabricksDeltaLake
指定Azure Databricks工作区 URL,例如 https://adb-xxxxxxxxx.xx.databricks.azure.cn
群集ID 指定现有群集的群集 ID。 该群集应该是已创建的交互式群集。
可以通过 Databricks 工作区 ->“群集”->“交互式群集名称”->“配置”->“标记”找到交互式群集的群集 ID。 了解详细信息
工作区资源标识符 指定Azure Databricks的工作区资源 ID。
connectVia 用于连接到数据存储的集成运行时。 可以使用Azure集成运行时或自承载集成运行时(如果数据存储位于专用网络中)。 如果未指定,则使用默认Azure集成运行时。

示例:

{
    "name": "AzureDatabricksDeltaLakeLinkedService",
    "properties": {
        "type": "AzureDatabricksDeltaLake",
        "typeProperties": {
            "domain": "https://adb-xxxxxxxxx.xx.databricks.azure.cn",
            "clusterId": "<cluster id>",
            "workspaceResourceId": "<workspace resource id>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

用户分配的托管标识身份验证

若要详细了解Azure资源的用户分配托管标识,请参阅 用户分配的托管标识

若要使用用户分配的托管标识身份验证,请执行以下步骤:

  1. 创建一个或多个用户分配的托管标识并在Azure Databricks中授予权限。 一般情况下,您必须在 Azure Databricks 的 访问控制 (IAM) 中,至少向您的用户分配的托管标识授予 贡献者 角色。

  2. 为数据工厂或 Synapse 工作区分配一个或多个用户分配的托管标识,并为每个用户分配的托管标识创建凭据

Azure Databricks Delta Lake 链接服务支持以下属性:

属性 描述 必需
类型 type 属性必须设置为 AzureDatabricksDeltaLake
指定Azure Databricks工作区 URL,例如 https://adb-xxxxxxxxx.xx.databricks.azure.cn
群集ID 指定现有群集的群集 ID。 该群集应该是已创建的交互式群集。
可以通过 Databricks 工作区 ->“群集”->“交互式群集名称”->“配置”->“标记”找到交互式群集的群集 ID。 了解详细信息
凭据 将用户分配的托管标识指定为凭据对象。
workspaceResourceId(工作区资源ID) 指定Azure Databricks的工作区资源 ID。
connectVia 用于连接到数据存储的集成运行时。 可以使用Azure集成运行时或自承载集成运行时(如果数据存储位于专用网络中)。 如果未指定,则使用默认Azure集成运行时。

示例:

{
    "name": "AzureDatabricksDeltaLakeLinkedService",
    "properties": {
        "type": "AzureDatabricksDeltaLake",
        "typeProperties": {
            "domain": "https://adb-xxxxxxxxx.xx.databricks.azure.cn",
            "clusterId": "<cluster id>",
            "credential": {
                "referenceName": "credential1",
                "type": "CredentialReference"
            },
            "workspaceResourceId": "<workspace resource id>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。

Azure Databricks Delta Lake 数据集支持以下属性。

属性 描述 必需
类型 数据集的 type 属性必须设置为 AzureDatabricksDeltaLakeDataset
数据库 数据库的名称。 对源为“否”,对汇为“是”
Delta 表的名称。 对源为“否”,对汇为“是”

示例:

{
    "name": "AzureDatabricksDeltaLakeDataset",
    "properties": {
        "type": "AzureDatabricksDeltaLakeDataset",
        "typeProperties": {
            "database": "<database name>",
            "table": "<delta table name>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动 属性

有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。 本部分提供 Azure Databricks Delta Lake 源和接收器支持的属性列表。

Delta Lake 作为数据源

若要从 Azure Databricks Delta Lake 复制数据,复制活动 source 节支持以下属性。

属性 描述 必需
类型 复制活动源的 type 属性必须设置为 AzureDatabricksDeltaLakeSource
查询 指定用于读取数据的 SQL 查询。 对于时间旅行控制,请遵循以下模式:
- SELECT * FROM events TIMESTAMP AS OF timestamp_expression
- SELECT * FROM events VERSION AS OF version
导出设置 用于从增量表检索数据的高级设置。
exportSettings
类型 导出命令的类型,设置为 AzureDatabricksDeltaLakeExportCommand。
dateFormat 将日期类型格式化为具有日期格式的字符串。 自定义日期格式遵循日期/时间模式中的格式。 如果未指定,则它使用默认值 yyyy-MM-dd
时间戳格式 将时间戳类型格式化为具有时间戳格式的字符串。 自定义日期格式遵循日期/时间模式中的格式。 如果未指定,则它使用默认值 yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

直接从 Delta Lake 复制

如果接收器数据存储和格式符合本节中所述的条件,则可以使用复制活动从 Azure Databricks Delta 表直接复制到接收器。 如果未满足以下条件,服务会检查设置,然后将复制活动运行标记为失败:

  • 目标链接服务Azure Blob 存储Azure Data Lake Storage Gen2。 帐户凭据应在Azure Databricks群集配置中预先配置,从 Prerequisites 了解详细信息。

  • 接收器数据格式为“Parquet”、“带分隔符的文本”或“Avro”,具有以下配置,并且指向文件夹而非文件。

    • 对于“Parquet”格式,压缩编解码器为“none”、“snappy”或或“gzip”。
    • 对于“带分隔符的文本”格式:
      • rowDelimiter 是任意单个字符。
      • compression 可以是“none”、“bzip2”、“gzip”。
      • 不支持 encodingName UTF-7。
    • 对于“Avro”格式,压缩编解码器为“none”、“deflate”或“snappy”。
  • 在复制活动源中,未指定 additionalColumns

  • 如果将数据复制为带分隔符的文本,则在复制活动接收器中,fileExtension 需要是“.csv”。

  • 在复制活动映射中,未启用类型转换。

示例:

"activities":[
    {
        "name": "CopyFromDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Delta lake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureDatabricksDeltaLakeSource",
                "sqlReaderQuery": "SELECT * FROM events TIMESTAMP AS OF timestamp_expression"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

从 Delta Lake 进行的分阶段复制

如果接收器数据存储或格式与直接复制条件不匹配,如上一部分所述,请使用临时Azure存储实例启用内置暂存复制。 分阶段复制功能也能提升吞吐量。 服务将数据从 Azure Databricks Delta Lake 导出到暂存存储中,然后将数据复制到数据接收端,最后从暂存存储中清除临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制

若要使用此功能,请创建Azure Blob 存储链接服务Azure Data Lake Storage Gen2链接服务,该服务将存储帐户称为临时暂存。 然后,在复制活动中指定 enableStagingstagingSettings 属性。

注意

应在 Azure Databricks群集配置中预配置暂存存储帐户凭据,有关详细信息,请参阅 Prerequisites

示例:

"activities":[
    {
        "name": "CopyFromDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Delta lake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureDatabricksDeltaLakeSource",
                "sqlReaderQuery": "SELECT * FROM events TIMESTAMP AS OF timestamp_expression"
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingStorage",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Delta Lake 作为接收器

若要将数据复制到 Azure Databricks Delta Lake,复制活动 sink 节支持以下属性。

属性 描述 必需
类型 复制活动接收器的 type 属性,设置为 AzureDatabricksDeltaLakeSink
preCopyScript 请为每次运行中在将数据写入 Databricks Delta 表之前执行的 Copy 活动指定一个 SQL 查询。 示例:VACUUM eventsTable DRY RUN。你可以使用此属性清除预加载的数据,或者添加 truncate table 或 Vacuum 语句。
importSettings 用于将数据写入 Delta 表的高级设置。
importSettings
类型 导入命令的类型,设置为 AzureDatabricksDeltaLakeImportCommand。
日期格式 将字符串格式化为具有日期格式的日期类型。 自定义日期格式遵循日期/时间模式中的格式。 如果未指定,则它使用默认值 yyyy-MM-dd
timestampFormat 将字符串格式化为具有时间戳格式的时间戳类型。 自定义日期格式遵循日期/时间模式中的格式。 如果未指定,则它使用默认值 yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

直接将副本复制到 Delta Lake

如果源数据存储和格式符合本节中所述的条件,则可以使用复制活动直接从源复制到 Azure Databricks Delta Lake。 如果未满足以下条件,服务会检查设置,然后将复制活动运行标记为失败:

  • 源链接服务Azure Blob 存储Azure Data Lake Storage Gen2。 帐户凭据应在Azure Databricks群集配置中预先配置,从 Prerequisites 了解详细信息。

  • 源数据格式为“Parquet”、“带分隔符的文本”或“Avro”,具有以下配置,并且指向文件夹而非文件。

    • 对于“Parquet”格式,压缩编解码器为“none”、“snappy”或或“gzip”。
    • 对于带分隔符的文本格式:
      • rowDelimiter 为默认值或任意单个字符。
      • compression 可以是“none”、“bzip2”、“gzip”。
      • 不支持 encodingName UTF-7。
    • 对于“Avro”格式,压缩编解码器为“none”、“deflate”或“snappy”。
  • 在复制活动的源数据中:

    • wildcardFileName 仅包含通配符 * 而未包含 ?,未指定 wildcardFolderName
    • 未指定 prefixmodifiedDateTimeStartmodifiedDateTimeEndenablePartitionDiscovery
    • 未指定 additionalColumns
  • 在复制活动映射中,未启用类型转换。

示例:

"activities":[
    {
        "name": "CopyToDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Delta lake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureDatabricksDeltaLakeSink",
                "sqlReaderQuery": "VACUUM eventsTable DRY RUN"
            }
        }
    }
]

分阶段复制到 Delta Lake

如果源数据存储或格式与直接复制条件不匹配,如上一部分所述,请使用临时Azure存储实例启用内置暂存复制。 暂存复制功能也能提供更高的吞吐量。 该服务会自动转换数据以符合暂存存储的数据格式要求,然后从那里将数据加载到 Delta Lake。 最后,它会从存储中清理临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制

若要使用此功能,请创建Azure Blob 存储链接服务Azure Data Lake Storage Gen2链接服务,该服务将存储帐户称为临时暂存。 然后,在复制活动中指定 enableStagingstagingSettings 属性。

注意

应在 Azure Databricks群集配置中预配置暂存存储帐户凭据,有关详细信息,请参阅 Prerequisites

示例:

"activities":[
    {
        "name": "CopyToDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Delta lake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureDatabricksDeltaLakeSink"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

监控

为其他连接器提供了与之相同的复制活动监视体验。 此外,由于从 Delta Lake 加载和导出数据在 Azure Databricks 群集上运行,因此可以进一步查看详细的群集日志监控性能

查找活动属性

有关属性的详细信息,请参阅查找活动

查找活动最多可以返回 1000 行;如果结果集包含的记录超过此范围,将返回前 1000 行。

有关 复制活动 支持的作为源和汇的数据存储的列表,请参阅 支持的数据存储和格式