在 Azure 数据工厂中使用 Spark 活动转换云中的数据Transform data in the cloud by using a Spark activity in Azure Data Factory

适用于: Azure 数据工厂 Azure Synapse Analytics(预览版)

本教程使用 Azure 门户创建 Azure 数据工厂管道。In this tutorial, you use the Azure portal to create an Azure Data Factory pipeline. 该管道使用 Spark 活动和按需 Azure HDInsight 链接服务转换数据。This pipeline transforms data by using a Spark activity and an on-demand Azure HDInsight linked service.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 创建数据工厂。Create a data factory.
  • 创建使用 Spark 活动的管道。Create a pipeline that uses a Spark activity.
  • 触发管道运行。Trigger a pipeline run.
  • 监视管道运行。Monitor the pipeline run.

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

  • Azure 存储帐户Azure storage account. 创建 Python 脚本和输入文件,并将其上传到 Azure 存储。You create a Python script and an input file, and you upload them to Azure Storage. Spark 程序的输出存储在此存储帐户中。The output from the Spark program is stored in this storage account. 按需 Spark 群集使用相同的存储帐户作为其主存储。The on-demand Spark cluster uses the same storage account as its primary storage.

备注

HdInsight 仅支持标准层的常规用途存储帐户。HdInsight supports only general-purpose storage accounts with standard tier. 请确保该帐户不是高级或仅 blob 存储帐户。Make sure that the account is not a premium or blob only storage account.

将 Python 脚本上传到 Blob 存储帐户Upload the Python script to your Blob storage account

  1. 创建包含以下内容的名为 WordCount_Spark.py 的 Python 文件:Create a Python file named WordCount_Spark.py with the following content:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替换为 Azure 存储帐户的名称。Replace <storageAccountName> with the name of your Azure storage account. 然后保存文件。Then, save the file.

  3. 在 Azure Blob 存储中,创建名为 adftutorial 的容器(如果尚不存在)。In Azure Blob storage, create a container named adftutorial if it does not exist.

  4. 创建名为 spark 的文件夹。Create a folder named spark.

  5. spark 文件夹中创建名为 script 的子文件夹。Create a subfolder named script under the spark folder.

  6. WordCount_Spark.py 文件上传到 script 子文件夹。Upload the WordCount_Spark.py file to the script subfolder.

上传输入文件Upload the input file

  1. 创建包含一些文本的名为 minecraftstory.txt 的文件。Create a file named minecraftstory.txt with some text. Spark 程序会统计此文本中的单词数量。The Spark program counts the number of words in this text.
  2. spark 文件夹中创建名为 inputfiles 的子文件夹。Create a subfolder named inputfiles in the spark folder.
  3. minecraftstory.txt 文件上传到 inputfiles 子文件夹。Upload the minecraftstory.txt file to the inputfiles subfolder.

创建数据工厂Create a data factory

  1. 启动 Microsoft EdgeGoogle Chrome Web 浏览器。Launch Microsoft Edge or Google Chrome web browser. 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。Currently, Data Factory UI is supported only in Microsoft Edge and Google Chrome web browsers.

  2. 在左侧菜单中选择“新建”,然后依次选择“数据 + 分析”、“数据工厂”。 Select New on the left menu, select Data + Analytics, and then select Data Factory.

    在“新建”窗格中选择“数据工厂”

  3. 在“新建数据工厂” 窗格的“名称”下输入 ADFTutorialDataFactoryIn the New data factory pane, enter ADFTutorialDataFactory under Name.

    “新建数据工厂”窗格

    Azure 数据工厂的名称必须 全局唯一The name of the Azure data factory must be globally unique. 如果看到以下错误,请更改数据工厂的名称。If you see the following error, change the name of the data factory. (例如,使用 <yourname>ADFTutorialDataFactory)。(For example, use <yourname>ADFTutorialDataFactory). 有关数据工厂项目的命名规则,请参阅数据工厂 - 命名规则一文。For naming rules for Data Factory artifacts, see the Data Factory - naming rules article.

    名称不可用时出错

  4. 对于“订阅”,请选择要在其中创建数据工厂的 Azure 订阅。 For Subscription, select your Azure subscription in which you want to create the data factory.

  5. 对于“资源组”,请执行以下步骤之一: For Resource Group, take one of the following steps:

    • 选择“使用现有资源组”,并从下拉列表选择现有的资源组。 Select Use existing, and select an existing resource group from the drop-down list.
    • 选择“新建”,并输入资源组的名称。 Select Create new, and enter the name of a resource group.

    本快速入门中的一些步骤假定对资源组使用 ADFTutorialResourceGroup 名称。Some of the steps in this quickstart assume that you use the name ADFTutorialResourceGroup for the resource group. 若要了解有关资源组的详细信息,请参阅 使用资源组管理 Azure 资源To learn about resource groups, see Using resource groups to manage your Azure resources.

  6. 对于“版本”,选择“V2”。 For Version, select V2.

  7. 对于“位置”,请选择数据工厂所在的位置。 For Location, select the location for the data factory.

    若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :可用产品(按区域)For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(例如 Azure 存储和 Azure SQL 数据库)和计算资源(例如 HDInsight)可以位于其他区域。The data stores (like Azure Storage and Azure SQL Database) and computes (like HDInsight) that Data Factory uses can be in other regions.

  8. 选择“创建” 。Select Create.

  9. 创建完成后,会显示“数据工厂”页。 After the creation is complete, you see the Data factory page. 选择“创作和监视”磁贴,在单独的选项卡中启动数据工厂 UI 应用程序。 Select the Author & Monitor tile to start the Data Factory UI application on a separate tab.

    数据工厂的主页,其中包含“创作和监视”磁贴

创建链接服务Create linked services

在本部分创作两个链接服务:You author two linked services in this section:

  • 一个用于将 Azure 存储帐户链接到数据工厂的 Azure 存储链接服务An Azure Storage linked service that links an Azure storage account to the data factory. 按需 HDInsight 群集使用此存储。This storage is used by the on-demand HDInsight cluster. 此存储还包含要运行的 Spark 脚本。It also contains the Spark script to be run.
  • 一个按需 HDInsight 链接服务An on-demand HDInsight linked service. Azure 数据工厂自动创建 HDInsight 群集并运行 Spark 程序。Azure Data Factory automatically creates an HDInsight cluster and runs the Spark program. 然后,当群集空闲预配置的时间后,就会删除 HDInsight 群集。It then deletes the HDInsight cluster after the cluster is idle for a preconfigured time.

创建 Azure 存储链接服务Create an Azure Storage linked service

  1. 在“入门”页的左侧面板中,切换到“编辑”选项卡。 On the Let's get started page, switch to the Edit tab in the left panel.

    “入门”页

  2. 选择窗口底部的“连接”,然后选择“+ 新建”。 Select Connections at the bottom of the window, and then select + New.

    用于创建新连接的按钮

  3. 在“新建链接服务”窗口中,选择“数据存储” > “Azure Blob 存储”,然后选择“继续”。 In the New Linked Service window, select Data Store > Azure Blob Storage, and then select Continue.

    选择“Azure Blob 存储”磁贴

  4. 至于“存储帐户名称”, 请从列表中选择名称,然后选择“保存”。 For Storage account name, select the name from the list, and then select Save.

    指定存储帐户名称的框

创建按需 HDInsight 链接服务Create an on-demand HDInsight linked service

  1. 再次选择“+ 新建”按钮,创建另一个链接服务。 Select the + New button again to create another linked service.

  2. 在“新建链接服务”窗口中,选择“计算” > “Azure HDInsight”,然后选择“继续”。 In the New Linked Service window, select Compute > Azure HDInsight, and then select Continue.

    选择“Azure HDInsight”磁贴

  3. 在“新建链接服务” 窗口中完成以下步骤:In the New Linked Service window, complete the following steps:

    a.a. 至于“名称”,请输入 AzureHDInsightLinkedServiceFor Name, enter AzureHDInsightLinkedService.

    b.b. 至于“类型”,请确认选择了“按需 HDInsight”。 For Type, confirm that On-demand HDInsight is selected.

    c.c. 对于“Azure 存储链接服务”,请选择“AzureBlobStorage1”。 For Azure Storage Linked Service, select AzureBlobStorage1. 前面已创建此链接服务。You created this linked service earlier. 如果使用了其他名称,请在此处指定正确的名称。If you used a different name, specify the right name here.

    d.d. 至于“群集类型”,请选择“spark”。 For Cluster type, select spark.

    e.e. 至于“服务主体 ID”,请输入有权创建 HDInsight 群集的服务主题的 ID。 For Service principal id, enter the ID of the service principal that has permission to create an HDInsight cluster.

    此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。This service principal needs to be a member of the Contributor role of the subscription or the resource group in which the cluster is created. 有关详细信息,请参阅创建 Azure Active Directory 应用程序和服务主体For more information, see Create an Azure Active Directory application and service principal. 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。The Service principal id is equivalent to the Application ID, and a Service principal key is equivalent to the value for a Client secret.

    f.f. 至于“服务主体密钥”,请输入此密钥。 For Service principal key, enter the key.

    g.g. 至于“资源组”,请选择创建数据工厂时使用的资源组。 For Resource group, select the same resource group that you used when you created the data factory. 将在此资源组中创建 Spark 群集。The Spark cluster is created in this resource group.

    h.如果该值不存在,请单击“添加行”。h. 展开“OS 类型”。 Expand OS type.

    i.i. 输入名称作为群集用户名Enter a name for Cluster user name.

    j.j. 输入该用户的群集密码Enter the Cluster password for the user.

    k.k. 选择“完成”。 Select Finish.

    HDInsight 链接服务设置

备注

Azure HDInsight 限制可在其支持的每个 Azure 区域中使用的核心总数。Azure HDInsight limits the total number of cores that you can use in each Azure region that it supports. 对于按需 HDInsight 链接服务,将在用作其主存储的同一 Azure 存储位置创建 HDInsight 群集。For the on-demand HDInsight linked service, the HDInsight cluster is created in the same Azure Storage location that's used as its primary storage. 请确保有足够的核心配额,以便能够成功创建群集。Ensure that you have enough core quotas for the cluster to be created successfully. 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集For more information, see Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more.

创建管道Create a pipeline

  1. 选择“+ (加)”按钮,然后在菜单上选择“管道”。 Select the + (plus) button, and then select Pipeline on the menu.

    用于创建新管道的按钮

  2. 在“活动” 工具箱中,展开“HDInsight” 。In the Activities toolbox, expand HDInsight. 将“Spark”活动从“活动”工具箱拖到管道设计器图面。 Drag the Spark activity from the Activities toolbox to the pipeline designer surface.

    拖动 Spark 活动

  3. 在底部“Spark”活动窗口的属性中完成以下步骤: In the properties for the Spark activity window at the bottom, complete the following steps:

    a.a. 切换到“HDI 群集”选项卡。 Switch to the HDI Cluster tab.

    b.b. 选择 AzureHDInsightLinkedService(在上一过程中创建)。Select AzureHDInsightLinkedService (which you created in the previous procedure).

    指定 HDInsight 链接服务

  4. 切换到“脚本/Jar” 选项卡,然后完成以下步骤:Switch to the Script/Jar tab, and complete the following steps:

    a.a. 对于“作业链接服务”,请选择“AzureBlobStorage1”。 For Job Linked Service, select AzureBlobStorage1.

    b.b. 选择“浏览存储”。 Select Browse Storage.

    在“脚本/Jar”选项卡上指定 Spark 脚本

    c.c. 浏览到“adftutorial/spark/script”文件夹,选择“WordCount_Spark.py”,然后选择“完成”。 Browse to the adftutorial/spark/script folder, select WordCount_Spark.py, and then select Finish.

  5. 若要验证管道,请选择工具栏中的“验证”按钮。 To validate the pipeline, select the Validate button on the toolbar. 选择 >> (右键头)按钮,关闭验证窗口。Select the >> (right arrow) button to close the validation window.

    “验证”按钮

  6. 选择“全部发布”。 Select Publish All. 数据工厂 UI 会将实体(链接服务和管道)发布到 Azure 数据工厂服务。The Data Factory UI publishes entities (linked services and pipeline) to the Azure Data Factory service.

    “全部发布”按钮

触发管道运行Trigger a pipeline run

选择工具栏中的“添加触发器”,然后选择“立即触发”。 Select Add Trigger on the toolbar, and then select Trigger Now.

“触发器”和“立即触发”按钮

监视管道运行Monitor the pipeline run

  1. 切换到“监视”选项卡。 确认可以看到一个管道运行。Switch to the Monitor tab. Confirm that you see a pipeline run. 创建 Spark 群集大约需要 20 分钟。It takes approximately 20 minutes to create a Spark cluster.

  2. 定期选择“刷新”以检查管道运行的状态。 Select Refresh periodically to check the status of the pipeline run.

    用于监视管道运行的选项卡,其中包含“刷新”按钮

  3. 若要查看与管道运行相关联的活动运行,请选择“操作”列中的“查看活动运行”。 To see activity runs associated with the pipeline run, select View Activity Runs in the Actions column.

    管道运行状态

    选择顶部的“所有管道运行”链接可以切换回到管道运行视图。 You can switch back to the pipeline runs view by selecting the All Pipeline Runs link at the top.

    “活动运行”视图

验证输出Verify the output

验证 adftutorial 容器的 spark/otuputfiles/wordcount 文件夹中是否创建了一个输出文件。Verify that the output file is created in the spark/otuputfiles/wordcount folder of the adftutorial container.

输出文件的位置

该文件应包含输入文本文件中的每个单词,以及该单词在该文件中出现的次数。The file should have each word from the input text file and the number of times the word appeared in the file. 例如:For example:

(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)

后续步骤Next steps

此示例中的管道使用 Spark 活动和按需 HDInsight 链接服务转换数据。The pipeline in this sample transforms data by using a Spark activity and an on-demand HDInsight linked service. 你已了解如何:You learned how to:

  • 创建数据工厂。Create a data factory.
  • 创建使用 Spark 活动的管道。Create a pipeline that uses a Spark activity.
  • 触发管道运行。Trigger a pipeline run.
  • 监视管道运行。Monitor the pipeline run.

若要了解如何通过在虚拟网络的 Azure HDInsight 群集上运行 Hive 脚本来转换数据,请转到下一教程:To learn how to transform data by running a Hive script on an Azure HDInsight cluster that's in a virtual network, advance to the next tutorial: