使用 Azure 数据工厂向/从Azure Databricks Delta Lake 复制数据Copy data to and from Azure Databricks Delta Lake by using Azure Data Factory

适用于: Azure 数据工厂 Azure Synapse Analytics

本文概述了如何使用 Azure 数据工厂中的复制活动向/从 Azure Databricks Delta Lake 复制数据。This article outlines how to use the Copy activity in Azure Data Factory to copy data to and from Azure Databricks Delta Lake. 本文是在总体概述复制活动的 Azure 数据工厂中的复制活动的基础上编写的。It builds on the Copy activity in Azure Data Factory article, which presents a general overview of copy activity.

支持的功能Supported capabilities

以下活动支持此 Azure Databricks Delta Lake 连接器:This Azure Databricks Delta Lake connector is supported for the following activities:

通常,Azure 数据工厂支持使用具有以下功能的 Delta Lake 来满足你的各种需求。In general, Azure Data Factory supports Delta Lake with the following capabilities to meet your various needs.

  • 复制活动支持使用 Azure Databricks Delta Lake 连接器将数据从任何受支持的源数据存储复制到 Azure Databricks Delta Lake 表,以及从 Delta Lake 表复制到任何受支持的接收器数据存储。Copy activity supports Azure Databricks Delta Lake connector to copy data from any supported source data store to Azure Databricks delta lake table, and from delta lake table to any supported sink data store. 它利用 Databricks 群集执行数据移动,详见“先决条件”部分It leverages your Databricks cluster to perform the data movement, see details in Prerequisites section.
  • Databricks 活动支持在 Delta Lake 之上协调以代码为中心的 ETL 或机器学习工作负荷。Databricks activities supports orchestrating your code-centric ETL or machine learning workload on top of delta lake.

先决条件Prerequisites

若要使用此 Azure Databricks Delta Lake 连接器,你需要在 Azure Databricks 中设置群集。To use this Azure Databricks Delta Lake connector, you need to set up a cluster in Azure Databricks.

  • 为了将数据复制到 Delta Lake,复制活动会调用 Azure Databricks 群集来从 Azure 存储中读取数据。该存储可以是原始源,也可以是数据工厂通过内置的暂存复制首先将源数据写入到其中的暂存区域。To copy data to delta lake, Copy activity invokes Azure Databricks cluster to read data from an Azure Storage, which is either your original source or a staging area to where Data Factory firstly writes the source data via built-in staged copy. Delta Lake 作为源中了解详细信息。Learn more from Delta lake as the source.
  • 类似地,为了从 Delta Lake 复制数据,复制活动会调用 Azure Databricks 群集来将数据写入到 Azure 存储。该存储可以是你的原始接收器,也可以是数据工厂通过内置的暂存复制继续从中将数据写入到最终接收器的暂存区域。Similarly, to copy data from delta lake, Copy activity invokes Azure Databricks cluster to write data to an Azure Storage, which is either your original sink or a staging area from where Data Factory continues to write data to final sink via built-in staged copy. Delta Lake 作为接收器中了解详细信息。Learn more from Delta lake as the sink.

Databricks 群集需要有权访问 Azure Blob 或 Azure Data Lake Storage Gen2 帐户、用于源/接收器/暂存的存储容器/文件系统,以及要在其中写入 Delta Lake 表的容器/文件系统。The Databricks cluster needs to have access to Azure Blob or Azure Data Lake Storage Gen2 account, both the storage container/file system used for source/sink/staging and the container/file system where you want to write the Delta Lake tables.

  • 若要使用 Azure Data Lake Storage Gen2,你可以在 Databricks 群集上的 Apache Spark 配置中配置 服务主体存储帐户访问密钥To use Azure Data Lake Storage Gen2, you can configure a service principal or storage account access key on the Databricks cluster as part of the Apache Spark configuration.

  • 若要使用 Azure Blob 存储,你可以在 Databricks 群集上的 Apache Spark 配置中配置 存储帐户访问密钥SAS 令牌To use Azure Blob storage, you can configure a storage account access key or SAS token on the Databricks cluster as part of the Apache Spark configuration.

在复制活动执行期间,如果你配置的群集已终止,则数据工厂会自动启动它。During copy activity execution, if the cluster you configured has been terminated, Data Factory automatically starts it. 如果你使用数据工厂创作 UI 来创作管道,则对于数据预览之类的操作,你需要有一个实时群集,数据工厂不会代表你启动群集。If you author pipeline using Data Factory authoring UI, for operations like data preview, you need to have a live cluster, Data Factory won't start the cluster on your behalf.

指定群集配置Specify the cluster configuration

  1. 在“群集模式”下拉列表中,选择“标准” 。In the Cluster Mode drop-down, select Standard.

  2. 在“Databricks Runtime 版本”下拉列表中,选择一个 Databricks 运行时版本。In the Databricks Runtime Version drop-down, select a Databricks runtime version.

  3. 将以下属性添加到 Spark 配置来开启“自动优化”:Turn on Auto Optimize by adding the following properties to your Spark configuration:

    spark.databricks.delta.optimizeWrite.enabled true
    spark.databricks.delta.autoCompact.enabled true
    
  4. 根据集成和缩放需求配置群集。Configure your cluster depending on your integration and scaling needs.

入门Get started

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:To perform the Copy activity with a pipeline, you can use one of the following tools or SDKs:

以下部分详细介绍了用来定义特定于 Azure Databricks Delta Lake 连接器的数据工厂实体的属性。The following sections provide details about properties that define Data Factory entities specific to an Azure Databricks Delta Lake connector.

链接服务属性Linked service properties

Azure Databricks Delta Lake 链接服务支持以下属性。The following properties are supported for an Azure Databricks Delta Lake linked service.

PropertyProperty 描述Description 必选Required
typetype type 属性必须设置为 AzureDatabricksDeltaLakeThe type property must be set to AzureDatabricksDeltaLake. Yes
domain 指定 Azure Databricks 工作区 URL,例如 https://adb-xxxxxxxxx.xx.databricks.azure.cnSpecify the Azure Databricks workspace URL, e.g. https://adb-xxxxxxxxx.xx.databricks.azure.cn.
clusterIdclusterId 指定现有群集的群集 ID。Specify the cluster ID of an existing cluster. 该群集应该是已创建的交互式群集。It should be an already created Interactive Cluster.
可以通过 Databricks 工作区 ->“群集”->“交互式群集名称”->“配置”->“标记”找到交互式群集的群集 ID。You can find the Cluster ID of an Interactive Cluster on Databricks workspace -> Clusters -> Interactive Cluster Name -> Configuration -> Tags.
accessTokenaccessToken 数据工厂通过 Azure Databricks 进行身份验证时,必须使用访问令牌。Access token is required for Data Factory to authenticate to Azure Databricks. 需从 Databricks 工作区生成访问令牌。Access token needs to be generated from the databricks workspace.
connectViaconnectVia 用于连接到数据存储的 Integration RuntimeThe integration runtime that is used to connect to the data store. 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。You can use the Azure integration runtime or a self-hosted integration runtime (if your data store is located in a private network). 如果未指定,则使用默认 Azure Integration Runtime。If not specified, it uses the default Azure integration runtime. No

示例:Example:

{
    "name": "AzureDatabricksDeltaLakeLinkedService",
    "properties": {
        "type": "AzureDatabricksDeltaLake",
        "typeProperties": {
            "domain": "https://adb-xxxxxxxxx.xx.databricks.azure.cn",
            "clusterId": "<cluster id>",
            "accessToken": {
                "type": "SecureString", 
                "value": "<access token>"
            }
        }
    }
}

数据集属性Dataset properties

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。For a full list of sections and properties available for defining datasets, see the Datasets article.

Azure Databricks Delta Lake 数据集支持以下属性。The following properties are supported for the Azure Databricks Delta Lake dataset.

PropertyProperty 描述Description 必选Required
typetype 数据集的 type 属性必须设置为 AzureDatabricksDeltaLakeDatasetThe type property of the dataset must be set to AzureDatabricksDeltaLakeDataset. Yes
databasedatabase 数据库的名称。Name of the database. 对于源为“否”,对于接收器为“是”No for source, yes for sink
table 增量表的名称。Name of the delta table. 对于源为“否”,对于接收器为“是”No for source, yes for sink

示例:Example:

{
    "name": "AzureDatabricksDeltaLakeDataset",
    "properties": {
        "type": "AzureDatabricksDeltaLakeDataset",
        "typeProperties": {
            "database": "<database name>",
            "table": "<delta table name>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动属性Copy activity properties

有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。For a full list of sections and properties available for defining activities, see the Pipelines article. 本部分提供了 Azure Databricks Delta Lake 源和接收器支持的属性的列表。This section provides a list of properties supported by the Azure Databricks Delta Lake source and sink.

Delta Lake 作为源Delta lake as source

为了从 Azure Databricks Delta Lake 复制数据,复制活动的 source 节需要支持以下属性。To copy data from Azure Databricks Delta Lake, the following properties are supported in the Copy activity source section.

PropertyProperty 描述Description 必选Required
typetype 复制活动源的 type 属性必须设置为 AzureDatabricksDeltaLakeSourceThe type property of the Copy activity source must be set to AzureDatabricksDeltaLakeSource. Yes
queryquery 指定用于读取数据的 SQL 查询。Specify the SQL query to read data. 对于“按时间顺序查看”控制,请遵循以下模式:For the time travel control, follow the below pattern:
- SELECT * FROM events TIMESTAMP AS OF timestamp_expression
- SELECT * FROM events VERSION AS OF version
No
exportSettingsexportSettings 用于从增量表检索数据的高级设置。Advanced settings used to retrieve data from delta table. No
*exportSettings 下: _*Under exportSettings: _
typetype 导出命令的类型,设置为 _*AzureDatabricksDeltaLakeExportCommand**。The type of export command, set to _*AzureDatabricksDeltaLakeExportCommand**. Yes
dateFormatdateFormat 将日期类型格式化为具有日期格式的字符串。Format date type to string with a date format. 自定义日期格式遵循日期/时间模式中的格式。Custom date formats follow the formats at datetime pattern. 如果未指定,则它使用默认值 yyyy-MM-ddIf not specified, it uses the default value yyyy-MM-dd. No
timestampFormattimestampFormat 将时间戳类型格式化为具有时间戳格式的字符串。Format timestamp type to string with a timestamp format. 自定义日期格式遵循日期/时间模式中的格式。Custom date formats follow the formats at datetime pattern. 如果未指定,则它使用默认值 yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]If not specified, it uses the default value yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]. No

从 Delta Lake 进行的直接复制Direct copy from delta lake

如果接收器数据存储和格式符合此部分所述条件,则可使用复制活动将数据从 Azure Databricks Delta 表直接复制到接收器。If your sink data store and format meet the criteria described in this section, you can use the Copy activity to directly copy from Azure Databricks Delta table to sink. 数据工厂将检查设置,如果不符合以下条件,复制活动运行将会失败:Data Factory checks the settings and fails the Copy activity run if the following criteria is not met:

  • 接收器链接服务是 Azure Blob 存储Azure Data Lake Storage Gen2The sink linked service is Azure Blob storage or Azure Data Lake Storage Gen2. 应当在 Azure Databricks 群集配置中预先配置帐户凭据,请从先决条件中了解详细信息。The account credential should be pre-configured in Azure Databricks cluster configuration, learn more from Prerequisites.

  • 接收器数据格式为“Parquet”、“带分隔符的文本”或“Avro”,具有以下配置,并且指向文件夹而非文件。 The sink data format is of Parquet, delimited text, or Avro with the following configurations, and points to a folder instead of file.

    • 对于“Parquet”格式,压缩编解码器为“none”、“snappy”或或“gzip”。For Parquet format, the compression codec is none, snappy, or gzip.
    • 对于“带分隔符的文本”格式:For delimited text format:
      • rowDelimiter 是任意单个字符。rowDelimiter is any single character.
      • compression 可以是“none”、“bzip2”、“gzip”。 compression can be none, bzip2, gzip.
      • 不支持 encodingName UTF-7。encodingName UTF-7 is not supported.
    • 对于“Avro”格式,压缩编解码器为“none”、“deflate”或“snappy”。 For Avro format, the compression codec is none, deflate, or snappy.
  • 在复制活动源中,additionalColumns 未指定。In the Copy activity source, additionalColumns is not specified.

  • 如果将数据复制为带分隔符的文本,则在复制活动接收器中,fileExtension 需要是“.csv”。If copying data to delimited text, in copy activity sink, fileExtension need to be ".csv".

  • 在复制活动映射中,未启用类型转换。In the Copy activity mapping, type conversion is not enabled.

示例:Example:

"activities":[
    {
        "name": "CopyFromDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Delta lake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureDatabricksDeltaLakeSource",
                "sqlReaderQuery": "SELECT * FROM events TIMESTAMP AS OF timestamp_expression"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

从 Delta Lake 进行的暂存复制Staged copy from delta lake

如上一部分所述,如果接收器数据存储或格式与直接复制条件不匹配,请通过临时的 Azure 存储实例启用内置的暂存复制。When your sink data store or format does not match the direct copy criteria, as mentioned in the last section, enable the built-in staged copy using an interim Azure storage instance. 暂存复制功能也能提供更高的吞吐量。The staged copy feature also provides you better throughput. 数据工厂将数据从 Azure Databricks Delta Lake 导出到临时存储,然后将数据复制到接收器,最后从临时存储中清除临时数据。Data Factory exports data from Azure Databricks Delta Lake into staging storage, then copies the data to sink, and finally cleans up your temporary data from the staging storage. 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制See Staged copy for details about copying data by using staging.

若要使用此功能,请创建 Azure Blob 存储链接服务Azure Data Lake Storage Gen2 链接服务,该服务引用作为临时暂存帐户的存储帐户。To use this feature, create an Azure Blob storage linked service or Azure Data Lake Storage Gen2 linked service that refers to the storage account as the interim staging. 然后,在复制活动中指定 enableStagingstagingSettings 属性。Then specify the enableStaging and stagingSettings properties in the Copy activity.

备注

应当在 Azure Databricks 群集配置中预先配置暂存存储帐户凭据,请从先决条件中了解详细信息。The staging storage account credential should be pre-configured in Azure Databricks cluster configuration, learn more from Prerequisites.

示例:Example:

"activities":[
    {
        "name": "CopyFromDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Delta lake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "AzureDatabricksDeltaLakeSource",
                "sqlReaderQuery": "SELECT * FROM events TIMESTAMP AS OF timestamp_expression"
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingStorage",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Delta Lake 作为接收器Delta lake as sink

为了将数据复制到 Azure Databricks Delta Lake,复制活动的 sink 节需要支持以下属性。To copy data to Azure Databricks Delta Lake, the following properties are supported in the Copy activity sink section.

PropertyProperty 描述Description 必选Required
typetype 复制活动接收器的 type 属性,设置为 AzureDatabricksDeltaLakeSinkThe type property of the Copy activity sink, set to AzureDatabricksDeltaLakeSink. Yes
preCopyScriptpreCopyScript 指定一个 SQL 查询。每次运行时,复制活动在将数据写入到 Databricks 增量表之前都会运行该查询。Specify a SQL query for the Copy activity to run before writing data into Databricks delta table in each run. 你可以使用此属性来清除预加载的数据,或添加一个 truncate table 或 Vacuum语句。You can use this property to clean up the preloaded data, or add a truncate table or Vacuum statement. No
importSettingsimportSettings 用于将数据写入增量表的高级设置。Advanced settings used to write data into delta table. No
*importSettings 下: _*Under importSettings: _
typetype 导入命令的类型,设置为 _*AzureDatabricksDeltaLakeImportCommand**。The type of import command, set to _*AzureDatabricksDeltaLakeImportCommand**. Yes
dateFormatdateFormat 将字符串格式化为具有日期格式的日期类型。Format string to date type with a date format. 自定义日期格式遵循日期/时间模式中的格式。Custom date formats follow the formats at datetime pattern. 如果未指定,则它使用默认值 yyyy-MM-ddIf not specified, it uses the default value yyyy-MM-dd. No
timestampFormattimestampFormat 将字符串格式化为具有时间戳格式的时间戳类型。Format string to timestamp type with a timestamp format. 自定义日期格式遵循日期/时间模式中的格式。Custom date formats follow the formats at datetime pattern. 如果未指定,则它使用默认值 yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]If not specified, it uses the default value yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]. No

到 Delta Lake 的直接复制Direct copy to delta lake

如果源数据存储和格式符合此部分所述条件,则可使用复制活动将数据从源直接复制到 Azure Databricks Delta Lake。If your source data store and format meet the criteria described in this section, you can use the Copy activity to directly copy from source to Azure Databricks Delta Lake. Azure 数据工厂将检查设置,如果不符合以下条件,复制活动运行将会失败:Azure Data Factory checks the settings and fails the Copy activity run if the following criteria is not met:

  • 源链接服务是 Azure Blob 存储Azure Data Lake Storage Gen2The source linked service is Azure Blob storage or Azure Data Lake Storage Gen2. 应当在 Azure Databricks 群集配置中预先配置帐户凭据,请从先决条件中了解详细信息。The account credential should be pre-configured in Azure Databricks cluster configuration, learn more from Prerequisites.

  • 源数据格式为“Parquet”、“带分隔符的文本”或“Avro”,具有以下配置,并且指向文件夹而非文件。 The source data format is of Parquet, delimited text, or Avro with the following configurations, and points to a folder instead of file.

    • 对于“Parquet”格式,压缩编解码器为“none”、“snappy”或或“gzip”。For Parquet format, the compression codec is none, snappy, or gzip.
    • 对于“带分隔符的文本”格式:For delimited text format:
      • rowDelimiter 为默认值或任意单个字符。rowDelimiter is default, or any single character.
      • compression 可以是“none”、“bzip2”、“gzip”。 compression can be none, bzip2, gzip.
      • 不支持 encodingName UTF-7。encodingName UTF-7 is not supported.
    • 对于“Avro”格式,压缩编解码器为“none”、“deflate”或“snappy”。 For Avro format, the compression codec is none, deflate, or snappy.
  • 在复制活动源中:In the Copy activity source:

    • wildcardFileName 仅包含通配符 * 而未包含 ?,未指定 wildcardFolderNamewildcardFileName only contains wildcard * but not ?, and wildcardFolderName is not specified.
    • 未指定 prefixmodifiedDateTimeStartmodifiedDateTimeEndenablePartitionDiscoveryprefix, modifiedDateTimeStart, modifiedDateTimeEnd, and enablePartitionDiscovery are not specified.
    • 未指定 additionalColumnsadditionalColumns is not specified.
  • 在复制活动映射中,未启用类型转换。In the Copy activity mapping, type conversion is not enabled.

示例:Example:

"activities":[
    {
        "name": "CopyToDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Delta lake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureDatabricksDeltaLakeSink"
            }
        }
    }
]

到 Delta Lake 的暂存复制Staged copy to delta lake

如上一部分所述,如果源数据存储或格式与直接复制条件不匹配,请通过临时的 Azure 存储实例启用内置的暂存复制。When your source data store or format does not match the direct copy criteria, as mentioned in the last section, enable the built-in staged copy using an interim Azure storage instance. 暂存复制功能也能提供更高的吞吐量。The staged copy feature also provides you better throughput. 数据工厂会自动转换数据以满足暂存存储中的数据格式要求,然后再从其中将数据加载到 Delta Lake。Data Factory automatically converts the data to meet the data format requirements into staging storage, then load data into delta lake from there. 最后,它会从存储中清理临时数据。Finally, it cleans up your temporary data from the storage. 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制See Staged copy for details about copying data using staging.

若要使用此功能,请创建 Azure Blob 存储链接服务Azure Data Lake Storage Gen2 链接服务,该服务引用作为临时暂存帐户的存储帐户。To use this feature, create an Azure Blob storage linked service or Azure Data Lake Storage Gen2 linked service that refers to the storage account as the interim staging. 然后,在复制活动中指定 enableStagingstagingSettings 属性。Then specify the enableStaging and stagingSettings properties in the Copy activity.

备注

应当在 Azure Databricks 群集配置中预先配置暂存存储帐户凭据,请从先决条件中了解详细信息。The staging storage account credential should be pre-configured in Azure Databricks cluster configuration, learn more from Prerequisites.

示例:Example:

"activities":[
    {
        "name": "CopyToDeltaLake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Delta lake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "AzureDatabricksDeltaLakeSink"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

监视Monitoring

Azure 数据工厂提供与其他连接器相同的复制活动监视体验Azure Data Factory provides the same copy activity monitoring experience as other connectors.

查找活动属性Lookup activity properties

有关属性的详细信息,请参阅查找活动For more information about the properties, see Lookup activity.

后续步骤Next steps

有关数据工厂中复制活动支持用作源和接收器的数据存储的列表,请参阅支持的数据存储和格式For a list of data stores supported as sources and sinks by Copy activity in Data Factory, see supported data stores and formats.