在 Azure 数据工厂中使用 Spark 活动转换云中的数据Transform data in the cloud by using Spark activity in Azure Data Factory

本教程使用 Azure PowerShell 创建一个数据工厂管道,该管道可以使用 Spark 活动和按需 HDInsight 链接服务转换数据。In this tutorial, you use Azure PowerShell to create a Data Factory pipeline that transforms data using Spark Activity and an on-demand HDInsight linked service. 在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 创建数据工厂。Create a data factory.
  • 创作并部署链接服务。Author and deploy linked services.
  • 创作并部署管道。Author and deploy a pipeline.
  • 启动管道运行。Start a pipeline run.
  • 监视管道运行。Monitor the pipeline run.

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户。If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

  • Azure 存储帐户Azure Storage account. 创建 Python 脚本和输入文件,并将其上传到 Azure 存储。You create a python script and an input file, and upload them to the Azure storage. Spark 程序的输出存储在此存储帐户中。The output from the spark program is stored in this storage account. 按需 Spark 群集使用相同的存储帐户作为其主存储。The on-demand Spark cluster uses the same storage account as its primary storage.
  • Azure PowerShellAzure PowerShell. 遵循如何安装和配置 Azure PowerShell 中的说明。Follow the instructions in How to install and configure Azure PowerShell.

将 Python 脚本上传到 Blob 存储帐户Upload python script to your Blob Storage account

  1. 创建包含以下内容的名为 WordCount_Spark.py 的 Python 文件:Create a python file named WordCount_Spark.py with the following content:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替换为 Azure 存储帐户的名称。Replace <storageAccountName> with the name of your Azure Storage account. 然后保存文件。Then, save the file.

  3. 在 Azure Blob 存储中,创建名为 adftutorial 的容器(如果尚不存在)。In your Azure Blob Storage, create a container named adftutorial if it does not exist.

  4. 创建名为 spark 的文件夹。Create a folder named spark.

  5. spark 文件夹中创建名为 script 的子文件夹。Create a subfolder named script under spark folder.

  6. WordCount_Spark.py 文件上传到 script 子文件夹。Upload the WordCount_Spark.py file to the script subfolder.

上传输入文件Upload the input file

  1. 创建包含一些文本的名为 minecraftstory.txt 的文件。Create a file named minecraftstory.txt with some text. Spark 程序会统计此文本中的单词数量。The spark program counts the number of words in this text.
  2. spark 文件夹中创建名为 inputfiles 的子文件夹。Create a subfolder named inputfiles in the spark folder.
  3. minecraftstory.txt 上传到 inputfiles 子文件夹。Upload the minecraftstory.txt to the inputfiles subfolder.

创作链接服务Author linked services

在本部分中创作两个链接服务:You author two Linked Services in this section:

  • 一个 Azure 存储链接服务,用于将 Azure 存储帐户链接到数据工厂。An Azure Storage Linked Service that links an Azure Storage account to the data factory. 按需 HDInsight 群集使用此存储。This storage is used by the on-demand HDInsight cluster. 此存储还包含要执行的 Spark 脚本。It also contains the Spark script to be executed.
  • 一个按需 HDInsight 链接服务。An On-Demand HDInsight Linked Service. Azure 数据工厂自动创建 HDInsight 群集,运行 Spark 程序,然后在 HDInsight 群集空闲预配置的时间后将其删除。Azure Data Factory automatically creates a HDInsight cluster, run the Spark program, and then deletes the HDInsight cluster after it's idle for a pre-configured time.

Azure 存储链接服务Azure Storage linked service

使用偏好的编辑器创建一个 JSON 文件,复制 Azure 存储链接服务的以下 JSON 定义,并将该文件另存为 MyStorageLinkedService.jsonCreate a JSON file using your preferred editor, copy the following JSON definition of an Azure Storage linked service, and then save the file as MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>;EndpointSuffix=core.chinacloudapi.cn"
      }
    }
}

使用 Azure 存储帐户的名称和密钥更新 <storageAccountName> 和 <storageAccountKey>。Update the <storageAccountName> and <storageAccountKey> with the name and key of your Azure Storage account.

按需 HDInsight 链接服务On-demand HDInsight linked service

使用偏好的编辑器创建一个 JSON 文件,复制 Azure HDInsight 链接服务的以下 JSON 定义,并将该文件另存为 MyOnDemandSparkLinkedService.jsonCreate a JSON file using your preferred editor, copy the following JSON definition of an Azure HDInsight linked service, and save the file as MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

更新链接服务定义中以下属性的值:Update values for the following properties in the linked service definition:

  • hostSubscriptionIdhostSubscriptionId. 将 <subscriptionID> 替换为 Azure 订阅的 ID。Replace <subscriptionID> with the ID of your Azure subscription. 按需 HDInsight 群集在此 Azure 订阅中创建。The on-demand HDInsight cluster is created in this subscription.
  • tenanttenant. 将 <tenantID> 替换为 Azure 租户的 ID。Replace <tenantID> with ID of your Azure tenant.
  • servicePrincipalIdservicePrincipalKeyservicePrincipalId, servicePrincipalKey. 将 <servicePrincipalID> 和 <servicePrincipalKey> 分别替换为 Azure Active Directory 中服务主体的 ID 和密钥。Replace <servicePrincipalID> and <servicePrincipalKey> with ID and key of your service principal in the Azure Active Directory. 此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。This service principal needs to be a member of the Contributor role of the subscription or the resource Group in which the cluster is created. 有关详细信息,请参阅创建 Azure Active Directory 应用程序和服务主体See create Azure Active Directory application and service principal for details. 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。The Service principal id is equivalent to the Application ID and a Service principal key is equivalent to the value for a Client secret.
  • clusterResourceGroupclusterResourceGroup. 将 <resourceGroupOfHDICluster> 替换为需要在其中创建资源组的 HDInsight 群集的名称。Replace <resourceGroupOfHDICluster> with the name of the resource group in which the HDInsight cluster needs to be created.

备注

Azure HDInsight 会限制可在其支持的每个 Azure 区域中使用的核心总数。Azure HDInsight has limitation on the total number of cores you can use in each Azure region it supports. 对于按需 HDInsight 链接服务,将在 Azure 存储用作其主存储的同一位置创建 HDInsight 群集。For On-Demand HDInsight Linked Service, the HDInsight cluster will be created in the same location of the Azure Storage used as its primary storage. 请确保有足够的核心配额,以便能够成功创建群集。Ensure that you have enough core quotas for the cluster to be created successfully. 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集For more information, see Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more.

创作管道Author a pipeline

本步骤创建包含 Spark 活动的新管道。In this step, you create a new pipeline with a Spark activity. 该活动使用单词计数示例。The activity uses the word count sample. 从此位置下载内容(如果尚未这样做)。Download the contents from this location if you haven't already done so.

在偏好的编辑器中创建一个 JSON 文件,复制管道定义的以下 JSON 定义,并将该文件另存为 MySparkOnDemandPipeline.jsonCreate a JSON file in your preferred editor, copy the following JSON definition of a pipeline definition, and save it as MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

请注意以下几点:Note the following points:

  • rootPath 指向 adftutorial 容器的 spark 文件夹。rootPath points to the spark folder of the adftutorial container.
  • entryFilePath 指向 spark 文件夹的 script 子文件夹中的 WordCount_Spark.py 文件。entryFilePath points to the WordCount_Spark.py file in the script sub folder of the spark folder.

创建数据工厂Create a data factory

已在 JSON 文件中创作链接服务和管道定义。You have authored linked service and pipeline definitions in JSON files. 现在,让我们创建一个数据工厂,并使用 PowerShell cmdlet 部署链接服务和管道 JSON 文件。Now, let’s create a data factory, and deploy the linked Service and pipeline JSON files by using PowerShell cmdlets. 逐条运行以下 PowerShell 命令:Run the following PowerShell commands one by one:

  1. 逐个设置变量。Set variables one by one.

    资源组名称Resource Group Name

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    数据工厂名称。必须全局唯一Data Factory Name. Must be globally unique

    $dataFactoryName = "MyDataFactory09102017"
    

    管道名称Pipeline name

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. 启动 PowerShellLaunch PowerShell. 在完成本快速入门之前,请将 Azure PowerShell 保持打开状态。Keep Azure PowerShell open until the end of this quickstart. 如果将它关闭再重新打开,则需要再次运行下述命令。If you close and reopen, you need to run the commands again. 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :各区域的产品可用性For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    运行以下命令并输入用于登录 Azure 门户的用户名和密码:Run the following command, and enter the user name and password that you use to sign in to the Azure portal:

    Connect-AzAccount -Environment AzureChinaCloud
    

    运行以下命令查看此帐户的所有订阅:Run the following command to view all the subscriptions for this account:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。Run the following command to select the subscription that you want to work with. 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:Replace SubscriptionId with the ID of your Azure subscription:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. 创建资源组:ADFTutorialResourceGroup。Create the resource group: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "China East 2" 
    
  4. 创建数据工厂。Create the data factory.

     $df = Set-AzDataFactoryV2 -Location ChinaEast 2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    执行以下命令查看输出:Execute the following command to see the output:

    $df
    
  5. 切换到在其中创建了 JSON 文件的文件夹,并运行以下命令部署 Azure 存储链接服务:Switch to the folder where you created JSON files, and run the following command to deploy an Azure Storage linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 运行以下命令部署按需 Spark 链接服务:Run the following command to deploy an on-demand Spark linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 运行以下命令部署管道:Run the following command to deploy a pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

启动并监视管道运行Start and monitor a pipeline run

  1. 启动管道运行。Start a pipeline run. 该命令还会捕获管道运行 ID 用于将来的监视。It also captures the pipeline run ID for future monitoring.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 运行以下脚本来持续检查管道运行状态,直到运行完成为止。Run the following script to continuously check the pipeline run status until it finishes.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. 下面是示例运行的输出:Here is the output of the sample run:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.cn/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (China East 2)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. 确认是否在 adftutorial 容器的 spark 文件夹中创建了包含 spark 程序的输出的、名为 outputfiles 的文件夹。Confirm that a folder named outputfiles is created in the spark folder of adftutorial container with the output from the spark program.

后续步骤Next steps

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。The pipeline in this sample copies data from one location to another location in an Azure blob storage. 你已了解如何:You learned how to:

  • 创建数据工厂。Create a data factory.
  • 创作并部署链接服务。Author and deploy linked services.
  • 创作并部署管道。Author and deploy a pipeline.
  • 启动管道运行。Start a pipeline run.
  • 监视管道运行。Monitor the pipeline run.

继续学习下一篇教程,了解如何通过运行 Azure HDInsight 群集上的 Hive 脚本,转换虚拟网络中的数据。Advance to the next tutorial to learn how to transform data by running Hive script on an Azure HDInsight cluster that is in a virtual network.