在 Azure 数据工厂中使用 Spark 活动转换云中的数据

适用于:Azure 数据工厂 Azure Synapse Analytics

本教程使用 Azure PowerShell 创建一个数据工厂管道,该管道可以使用 Spark 活动和按需 HDInsight 链接服务转换数据。 在本教程中执行以下步骤:

  • 创建数据工厂。
  • 创作并部署链接服务。
  • 创作并部署管道。
  • 启动管道运行。
  • 监视管道运行。

如果没有 Azure 订阅,可在开始前创建一个试用帐户

先决条件

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

  • Azure 存储帐户。 创建 Python 脚本和输入文件,并将其上传到 Azure 存储。 Spark 程序的输出存储在此存储帐户中。 按需 Spark 群集使用相同的存储帐户作为其主存储。
  • Azure PowerShell。 遵循如何安装和配置 Azure PowerShell 中的说明。

将 Python 脚本上传到 Blob 存储帐户

  1. 创建包含以下内容的名为 WordCount_Spark.py 的 Python 文件:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替换为 Azure 存储帐户的名称。 然后保存文件。

  3. 在 Azure Blob 存储中,创建名为 adftutorial 的容器(如果尚不存在)。

  4. 创建名为 spark 的文件夹。

  5. spark 文件夹中创建名为 script 的子文件夹。

  6. WordCount_Spark.py 文件上传到 script 子文件夹。

上传输入文件

  1. 创建包含一些文本的名为 minecraftstory.txt 的文件。 Spark 程序会统计此文本中的单词数量。
  2. spark 文件夹中创建名为 inputfiles 的子文件夹。
  3. minecraftstory.txt 上传到 inputfiles 子文件夹。

创作链接服务

在本部分中创作两个链接服务:

  • 一个 Azure 存储链接服务,用于将 Azure 存储帐户链接到数据工厂。 按需 HDInsight 群集使用此存储。 此存储还包含要执行的 Spark 脚本。
  • 一个按需 HDInsight 链接服务。 Azure 数据工厂自动创建 HDInsight 群集,运行 Spark 程序,然后在 HDInsight 群集空闲预配置的时间后将其删除。

Azure 存储链接服务

使用偏好的编辑器创建一个 JSON 文件,复制 Azure 存储链接服务的以下 JSON 定义,并将该文件另存为 MyStorageLinkedService.json

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>;EndpointSuffix=core.chinacloudapi.cn"
      }
    }
}

使用 Azure 存储帐户的名称和密钥更新 <storageAccountName> 和 <storageAccountKey>。

按需 HDInsight 链接服务

使用偏好的编辑器创建一个 JSON 文件,复制 Azure HDInsight 链接服务的以下 JSON 定义,并将该文件另存为 MyOnDemandSparkLinkedService.json

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

更新链接服务定义中以下属性的值:

  • hostSubscriptionId。 将 <subscriptionID> 替换为 Azure 订阅的 ID。 按需 HDInsight 群集在此 Azure 订阅中创建。
  • tenant。 将 <tenantID> 替换为 Azure 租户的 ID。
  • servicePrincipalIdservicePrincipalKey。 将 <servicePrincipalID> 和 <servicePrincipalKey> 分别替换为 Microsoft Entra ID 中服务主体的 ID 和密钥。 此服务主体需是订阅“参与者”角色的成员,或创建群集的资源组的成员。 有关详细信息,请参阅创建 Microsoft Entra 应用程序和服务主体。 服务主体 ID 等效于应用程序 ID,服务主体密钥等效于客户端密码的值 。
  • clusterResourceGroup。 将 <resourceGroupOfHDICluster> 替换为需要在其中创建资源组的 HDInsight 群集的名称。

注意

Azure HDInsight 会限制可在其支持的每个 Azure 区域中使用的核心总数。 对于按需 HDInsight 链接服务,将在 Azure 存储用作其主存储的同一位置创建 HDInsight 群集。 请确保有足够的核心配额,以便能够成功创建群集。 有关详细信息,请参阅使用 Hadoop、Spark、Kafka 等在 HDInsight 中设置群集

创作管道

本步骤创建包含 Spark 活动的新管道。 该活动使用单词计数示例。 从此位置下载内容(如果尚未这样做)。

在偏好的编辑器中创建一个 JSON 文件,复制管道定义的以下 JSON 定义,并将该文件另存为 MySparkOnDemandPipeline.json

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

请注意以下几点:

  • rootPath 指向 adftutorial 容器的 spark 文件夹。
  • entryFilePath 指向 spark 文件夹的 script 子文件夹中的 WordCount_Spark.py 文件。

创建数据工厂

已在 JSON 文件中创作链接服务和管道定义。 现在,让我们创建一个数据工厂,并使用 PowerShell cmdlet 部署链接服务和管道 JSON 文件。 逐条运行以下 PowerShell 命令:

  1. 逐个设置变量。

    资源组名称

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    数据工厂名称。 必须全局唯一

    $dataFactoryName = "MyDataFactory09102017"
    

    管道名称

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. 启动 PowerShell。 在完成本快速入门之前,请将 Azure PowerShell 保持打开状态。 如果将它关闭再重新打开,则需要再次运行下述命令。 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。

    运行以下命令并输入用于登录 Azure 门户的用户名和密码:

    Connect-AzAccount -Environment AzureChinaCloud
    

    运行以下命令查看此帐户的所有订阅:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. 创建资源组:ADFTutorialResourceGroup。

    New-AzResourceGroup -Name $resourceGroupName -Location "China East 2" 
    
  4. 创建数据工厂。

     $df = Set-AzDataFactoryV2 -Location ChinaEast 2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    执行以下命令查看输出:

    $df
    
  5. 切换到在其中创建了 JSON 文件的文件夹,并运行以下命令部署 Azure 存储链接服务:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 运行以下命令部署按需 Spark 链接服务:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 运行以下命令部署管道:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

启动并监视管道运行

  1. 启动管道运行。 该命令还会捕获管道运行 ID 用于将来的监视。

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 运行以下脚本来持续检查管道运行状态,直到运行完成为止。

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. 下面是示例运行的输出:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.cn/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (China East 2)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. 确认是否在 adftutorial 容器的 spark 文件夹中创建了包含 spark 程序的输出的、名为 outputfiles 的文件夹。

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。 你已了解如何执行以下操作:

  • 创建数据工厂。
  • 创作并部署链接服务。
  • 创作并部署管道。
  • 启动管道运行。
  • 监视管道运行。

继续学习下一篇教程,了解如何通过运行 Azure HDInsight 群集上的 Hive 脚本,转换虚拟网络中的数据。