使用 Azure Databricks 进行转换Transformation with Azure Databricks

适用于: Azure 数据工厂 Azure Synapse Analytics

在本教程中,你将在数据工厂中创建包含“验证”、“复制数据”和“笔记本”活动的端到端管道。 In this tutorial, you create an end-to-end pipeline that contains the Validation, Copy data, and Notebook activities in Azure Data Factory.

  • “验证”确保你的源数据集在你触发复制和分析作业之前已准备好供下游消耗。Validation ensures that your source dataset is ready for downstream consumption before you trigger the copy and analytics job.

  • “复制数据”将源数据集复制到接收器存储,该存储在 Azure Databricks 笔记本中装载为 DBFS。Copy data duplicates the source dataset to the sink storage, which is mounted as DBFS in the Azure Databricks notebook. 这样,Spark 就可以直接使用该数据集。In this way, the dataset can be directly consumed by Spark.

  • “笔记本”触发对数据集进行转换的 Databricks 笔记本。Notebook triggers the Databricks notebook that transforms the dataset. 它还将数据集添加到已处理的文件夹或 Azure Synapse Analytics(以前称为 SQL 数据仓库)。It also adds the dataset to a processed folder or Azure Azure Synapse Analytics (formerly SQL Data Warehouse).

为简单起见,本教程中的模板没有创建计划的触发器。For simplicity, the template in this tutorial doesn't create a scheduled trigger. 如有必要,你可以添加计划的触发器。You can add one if necessary.

管道的图示

先决条件Prerequisites

  • 一个 Azure Blob 存储帐户,其中包含用作接收器的名为 sinkdata 的容器。An Azure Blob storage account with a container called sinkdata for use as a sink.

    记下存储帐户名称、容器名称和访问密钥。Make note of the storage account name, container name, and access key. 稍后在模板中需要使用这些值。You'll need these values later in the template.

  • 一个 Azure Databricks 工作区。An Azure Databricks workspace.

导入用于转换的笔记本Import a notebook for Transformation

若要将 转换 笔记本导入到 Databricks 工作区,请执行以下操作:To import a Transformation notebook to your Databricks workspace:

  1. 登录到你的 Azure Databricks 工作区,然后选择“导入”。Sign in to your Azure Databricks workspace, and then select Import. 用于导入工作区的菜单命令 你的工作区路径可能不同于所示的路径,但请记住它备用。Menu command for importing a workspace Your workspace path can be different from the one shown, but remember it for later.

  2. 选择“导入自:URL”。Select Import from: URL. 在文本框中,输入 https://adflabstaging1.blob.core.chinacloudapi.cn/share/Transformations.htmlIn the text box, enter https://adflabstaging1.blob.core.chinacloudapi.cn/share/Transformations.html.

    用于导入笔记本的选择

  3. 现在,使用你的存储连接信息更新“转换”笔记本。Now let's update the Transformation notebook with your storage connection information.

    在导入的笔记本中,转到 command 5,如下面的代码片段所示。In the imported notebook, go to command 5 as shown in the following code snippet.

    • <storage name><access key> 替换为你自己的存储连接信息。Replace <storage name>and <access key> with your own storage connection information.
    • 使用具有 sinkdata 容器的存储帐户。Use the storage account with the sinkdata container.
    # Supply storageName and accessKey values  
    storageName = "<storage name>"  
    accessKey = "<access key>"  
    
    try:  
      dbutils.fs.mount(  
        source = "wasbs://sinkdata\@"+storageName+".blob.core.chinacloudapi.cn/",  
        mount_point = "/mnt/Data Factorydata",  
        extra_configs = {"fs.azure.account.key."+storageName+".blob.core.chinacloudapi.cn": accessKey})  
    
    except Exception as e:  
      # The error message has a long stack track. This code tries to print just the relevant line indicating what failed.
    
    import re
    result = re.findall(r"\^\s\*Caused by:\s*\S+:\s\*(.*)\$", e.message, flags=re.MULTILINE)
    if result:
      print result[-1] \# Print only the relevant error message
    else:  
      print e \# Otherwise print the whole stack trace.  
    
  4. 为数据工厂生成“Databricks 访问令牌”以访问 Databricks。Generate a Databricks access token for Data Factory to access Databricks.

    1. 在你的 Databricks 工作区中,选择右上角的用户配置文件图标。In your Databricks workspace, select your user profile icon in the upper right.
    2. 选择“用户设置”。Select User Settings. 用于用户设置的菜单命令Menu command for user settings
    3. 在“访问令牌”选项卡下选择“生成新令牌”。Select Generate New Token under the Access Tokens tab.
    4. 然后选择“生成” 。Select Generate.

    “生成”按钮

    保存访问令牌,以便稍后将其用于创建 Databricks 链接服务。Save the access token for later use in creating a Databricks linked service. 访问令牌类似于 dapi32db32cbb4w6eee18b7d87e45exxxxxxThe access token looks something like dapi32db32cbb4w6eee18b7d87e45exxxxxx.

如何使用此模板How to use this template

  1. 转到“使用 Azure Databricks 进行转换”模板,为以下连接创建新的链接服务。Go to the Transformation with Azure Databricks template and create new linked services for following connections.

    连接设置

    • 源 Blob 连接 - 用于访问源数据。Source Blob Connection - to access the source data.

      对于此练习,你可以使用包含源文件的公共 Blob 存储。For this exercise, you can use the public blob storage that contains the source files. 有关配置,请参考下面的屏幕截图。Reference the following screenshot for the configuration. 使用以下 SAS URL 连接到源存储(只读访问):Use the following SAS URL to connect to source storage (read-only access):

      https://storagewithdata.blob.core.chinacloudapi.cn/data?sv=2018-03-28&si=read%20and%20list&sr=c&sig=PuyyS6%2FKdB2JxcZN0kPlmHSBlD8uIKyzhBWmWzznkBw%3D

      针对身份验证方法和 SAS URL 的选择

    • 目标 Blob 连接 - 用于存储复制的数据。Destination Blob Connection - to store the copied data.

      在“新建链接服务”窗口中,选择你的接收器存储 blob。In the New linked service window, select your sink storage blob.

      将接收器存储 blob 用作新的链接服务

    • Azure Databricks - 用于连接到 Databricks 群集。Azure Databricks - to connect to the Databricks cluster.

      使用之前生成的访问密钥创建 Databricks 链接服务。Create a Databricks-linked service by using the access key that you generated previously. 你还可以选择一个“交互式群集”(如果有)。You can opt to select an interactive cluster if you have one. 此示例使用“新建作业群集”选项。This example uses the New job cluster option.

      用于连接到群集的选择

  2. 选择“使用此模板” 。Select Use this template. 你将看到一个创建的管道。You'll see a pipeline created.

    创建管道

管道简介和配置Pipeline introduction and configuration

在新管道中,大多数设置都自动配置为默认值。In the new pipeline, most settings are configured automatically with default values. 请查看管道的配置,并进行任何必要的更改。Review the configurations of your pipeline and make any necessary changes.

  1. 在“验证”活动的“可用性”标志中,验证源“数据集”值是否设置为你之前创建的 SourceAvailabilityDatasetIn the Validation activity Availability flag, verify that the source Dataset value is set to SourceAvailabilityDataset that you created earlier.

    源数据集值

  2. 在“复制数据”活动的“文件到 blob”中,检查“源”和“接收器”选项卡。In the Copy data activity file-to-blob, check the Source and Sink tabs. 如有必要,请更改设置。Change settings if necessary.

    • “源”选项卡 “源”选项卡Source tab Source tab

    • “接收器”选项卡 “接收器”选项卡Sink tab Sink tab

  3. 在“笔记本”活动的“转换”中,查看并根据需要更新路径和设置。In the Notebook activity Transformation, review and update the paths and settings as needed.

    “Databricks 链接服务”应当预先填充前面步骤中的值,如下所示:Databricks 链接服务的填充值Databricks linked service should be pre-populated with the value from a previous step, as shown: Populated value for the Databricks linked service

    若要检查“笔记本”设置,请执行以下操作:To check the Notebook settings:

    1. 选择“设置”选项卡。对于“笔记本路径”,请验证默认路径是否正确。Select the Settings tab. For Notebook path, verify that the default path is correct. 你可能需要浏览并选择正确的笔记本路径。You might need to browse and choose the correct notebook path.

      笔记本路径

    2. 展开“基参数”选择器,验证参数是否与以下屏幕截图中显示的内容匹配。Expand the Base Parameters selector and verify that the parameters match what is shown in the following screenshot. 这些参数将从数据工厂传递到 Databricks 笔记本。These parameters are passed to the Databricks notebook from Data Factory.

      基参数

  4. 验证“管道参数”是否与以下屏幕截图中显示的内容匹配:管道参数Verify that the Pipeline Parameters match what is shown in the following screenshot: Pipeline parameters

  5. 连接到你的数据集。Connect to your datasets.

    备注

    在下面的数据集内,已在模板中自动指定了文件路径。In below datasets, the file path has been automatically specified in the template. 当出现任何连接错误时,如果需要进行任何更改,请确保同时为“容器”和“目录”指定路径。If any changes required, make sure that you specify the path for both container and directory in case any connection error.

    • SourceAvailabilityDataset - 用于检查源数据是否可用。SourceAvailabilityDataset - to check that the source data is available.

      针对 SourceAvailabilityDataset 的链接服务和文件路径的选择

    • SourceFilesDataset - 用于访问源数据。SourceFilesDataset - to access the source data.

      针对 SourceFilesDataset 的链接服务和文件路径的选择

    • DestinationFilesDataset - 用于将数据复制到接收器目标位置。DestinationFilesDataset - to copy the data into the sink destination location. 使用以下值:Use the following values:

      • 链接服务 - sinkBlob_LS,在前面的步骤中创建。Linked service - sinkBlob_LS, created in a previous step.

      • 文件路径 - sinkdata/staged_sinkFile path - sinkdata/staged_sink.

        针对 DestinationFilesDataset 的链接服务和文件路径的选择

  6. 选择“调试”以运行管道。Select Debug to run the pipeline. 可以找到 Databricks 日志的链接以获取更详细的 Spark 日志。You can find the link to Databricks logs for more detailed Spark logs.

    从输出链接到 Databricks 日志

    还可以使用 Azure 存储资源管理器验证数据文件。You can also verify the data file by using Azure Storage Explorer.

    备注

    为了与数据工厂管道运行相关联,此示例将管道运行 ID 从数据工厂附加到输出文件夹。For correlating with Data Factory pipeline runs, this example appends the pipeline run ID from the data factory to the output folder. 这有助于跟踪每次运行生成的文件。This helps keep track of files generated by each run. 追加的管道运行 IDAppended pipeline run ID

后续步骤Next steps