PowerShell 脚本 - 使用 Azure 数据工厂转换云中的数据PowerShell script - transform data in cloud using Azure Data Factory

此示例 PowerShell 脚本通过在 Azure HDInsight Spark 群集上运行 Spark 程序,创建用于转换云中数据的管道。This sample PowerShell script creates a pipeline that transforms data in the cloud by running Spark program on an Azure HDInsight Spark cluster.

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

本示例需要 Azure PowerShell。This sample requires Azure PowerShell. 运行 Get-Module -ListAvailable Az 即可查找版本。Run Get-Module -ListAvailable Az to find the version. 如果需要进行安装或升级,请参阅安装 Azure PowerShell 模块If you need to install or upgrade, see Install Azure PowerShell module.

运行 Connect-AzAccount -Environment AzureChinaCloud cmdlet 以连接到 Azure 中国。Run the Connect-AzAccount -Environment AzureChinaCloud cmdlet to connect to Azure China.

必备条件Prerequisites

  • Azure 存储帐户Azure Storage account. 创建 Python 脚本和输入文件,并将其上传到 Azure 存储。Create a python script and an input file, and upload them to the Azure storage. Spark 程序的输出存储在此存储帐户中。The output from the spark program is stored in this storage account. 按需 Spark 群集使用相同的存储帐户作为其主存储。The on-demand Spark cluster uses the same storage account as its primary storage.

将 Python 脚本上传到 Blob 存储帐户Upload python script to your Blob Storage account

  1. 创建包含以下内容的名为 WordCount_Spark.py 的 Python 文件:Create a python file named WordCount_Spark.py with the following content:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> 替换为 Azure 存储帐户的名称。Replace <storageAccountName> with the name of your Azure Storage account. 然后保存文件。Then, save the file.

  3. 在 Azure Blob 存储中,创建名为 adftutorial 的容器(如果尚不存在)。In your Azure Blob Storage, create a container named adftutorial if it does not exist.

  4. 创建名为 spark 的文件夹。Create a folder named spark.

  5. spark 文件夹中创建名为 script 的子文件夹。Create a subfolder named script under spark folder.

  6. WordCount_Spark.py 文件上传到 script 子文件夹。Upload the WordCount_Spark.py file to the script subfolder.

上传输入文件Upload the input file

  1. 创建包含一些文本的名为 minecraftstory.txt 的文件。Create a file named minecraftstory.txt with some text. Spark 程序会统计此文本中的单词数量。The spark program counts the number of words in this text.
  2. 在 blob 容器的 spark 文件夹中创建一个名为 inputfiles 的子文件夹。Create a subfolder named inputfiles in the spark folder of the blob container.
  3. minecraftstory.txt 上传到 inputfiles 子文件夹。Upload the minecraftstory.txt to the inputfiles subfolder.

示例脚本Sample script

重要

此脚本在硬盘驱动器上的 c:\ 文件夹中创建 JSON 文件,用于定义数据工厂实体(链接服务、数据集和管道)。This script creates JSON files that define Data Factory entities (linked service, dataset, and pipeline) on your hard drive in the c:\ folder.

powershell Set-ExecutionPolicy Unrestricted -Scope CurrentUser

# Set variables with your own values
$resourceGroupName = "<Azure resource group name>"
$dataFactoryName = "<Data factory name. Must be globally unique.>"
$dataFactoryRegion = "China East 2" 
$storageAccountName = "<Az.Storage account name> "
$storageAccountKey = "<Az.Storage account key>"
$subscriptionID = "<Azure subscription ID>"
$tenantID = "<tenant ID>"
$servicePrincipalID = "<Active directory service principal ID>"
$servicePrincipalKey = "<Active directory service principal key>"

$pipelineName = "SparkTransformPipeline"

# Create a resource group
New-AzResourceGroup -Name $resourceGroupName -Location $dataFactoryRegion

# Create a data factory
$df = Set-AzDataFactory -ResourceGroupName $resourceGroupName -Location $dataFactoryRegion -Name $dataFactoryName

# Create an Az.Storage linked service in the data factory

## JSON definition of the linked service. 
$storageLinkedServiceDefinition = @"
{
    "name": "AzureStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": {
                "value": "DefaultEndpointsProtocol=https;AccountName=$storageAccountName;AccountKey=$storageAccountKey;EndpointSuffix=core.chinacloudapi.cn",
                "type": "SecureString"
            }
        }
    }
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command. 
$storageLinkedServiceDefinition | Out-File c:\AzureStorageLinkedService.json

## Creates an Az.Storage linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File c:\AzureStorageLinkedService.json

# Create on-demand Spark linked service in the data factory

## JSON definition of the linked service. 
$sparkLinkedServiceDefinition = @"
{
    "name": "OnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "$subscriptionID",
        "servicePrincipalId": "$servicePrincipalID",
        "servicePrincipalKey": {
          "value": "$servicePrincipalKey",
          "type": "SecureString"
        },
        "tenant": "$tenantID",
        "clusterResourceGroup": "$resourceGroupName",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "AzureStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command. 
$sparkLinkedServiceDefinition | Out-File c:\OnDemandSparkLinkedService.json

# Creates an on-demand Spark linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "OnDemandSparkLinkedService" -File "C:\OnDemandSparkLinkedService.json"

# Create a pipeline in the data factory

## JSON definition of the pipeline
$pipelineDefinition = @"
{
  "name": "SparkTransformPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "OnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "AzureStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryPipeline command.
$pipelineDefinition | Out-File c:\SparkTransformPipeline.json

## Create a pipeline with Spark Activity in the data factory
Set-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SparkTransformPipeline" -File "c:\SparkTransformPipeline.json"

# Create a pipeline run 

## JSON definition for dummy pipeline parameters
$pipelineParameters = @"
{
    "dummy":  "b"
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Invoke-AzDataFactoryPipeline command. 
$pipelineParameters | Out-File c:\PipelineParameters.json

# Create a pipeline run by using parameters
$runId = Invoke-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName -ParameterFile c:\PipelineParameters.json

# Check the pipeline run status until it finishes
Start-Sleep -Seconds 30
while ($True) {
    $result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        Start-Sleep -Seconds 300
    }
    else {
        Write-Host "Pipeline $pipelineName run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
}

# Get the activity run details 
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName `
    -PipelineRunId $runId `
    -RunStartedAfter (Get-Date).AddMinutes(-30) `
    -RunStartedBefore (Get-Date).AddMinutes(30) `
    -ErrorAction Stop

$result

if ($result.Status -eq "Succeeded") {`
    $result.Output -join "`r`n"`
}`
else {`
    $result.Error -join "`r`n"`
}

# To remove the data factory from the resource gorup
# Remove-AzDataFactory -Name $dataFactoryName -ResourceGroupName $resourceGroupName
# 
# To remove the whole resource group
# Remove-AzResourceGroup  -Name $resourceGroupName

清理部署Clean up deployment

运行示例脚本后,可以使用以下命令删除资源组以及与其关联的所有资源:After you run the sample script, you can use the following command to remove the resource group and all resources associated with it:

Remove-AzResourceGroup -ResourceGroupName $resourceGroupName

若要从资源组中删除数据工厂,请运行以下命令:To remove the data factory from the resource group, run the following command:

Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName

脚本说明Script explanation

此脚本使用以下命令:This script uses the following commands:

CommandCommand 说明Notes
New-AzResourceGroupNew-AzResourceGroup 创建用于存储所有资源的资源组。Creates a resource group in which all resources are stored.
Set-AzDataFactoryV2Set-AzDataFactoryV2 创建数据工厂。Create a data factory.
Set-AzDataFactoryV2LinkedServiceSet-AzDataFactoryV2LinkedService 在数据工厂中创建链接服务。Creates a linked service in the data factory. 链接服务可将数据存储或计算链接到数据工厂。A linked service links a data store or compute to a data factory.
Set-AzDataFactoryV2PipelineSet-AzDataFactoryV2Pipeline 在数据工厂中创建管道。Creates a pipeline in the data factory. 一个管道包含一个或多个执行某项操作的活动。A pipeline contains one or more activities that performs a certain operation. 在此管道中,spark 活动通过在 Azure HDInsight Spark 群集上运行程序来转换数据。In this pipeline, a spark activity transforms data by running a program on an Azure HDInsight Spark cluster.
Invoke-AzDataFactoryV2PipelineInvoke-AzDataFactoryV2Pipeline 为管道创建运行。Creates a run for the pipeline. 换而言之,就是运行管道。In other words, runs the pipeline.
Get-AzDataFactoryV2ActivityRunGet-AzDataFactoryV2ActivityRun 获取管道中活动的运行(活动运行)的相关详细信息。Gets details about the run of the activity (activity run) in the pipeline.
Remove-AzResourceGroupRemove-AzResourceGroup 删除资源组,包括所有嵌套的资源。Deletes a resource group including all nested resources.

后续步骤Next steps

有关 Azure PowerShell 的详细信息,请参阅 Azure PowerShell 文档For more information on the Azure PowerShell, see Azure PowerShell documentation.

可以在 Azure 数据工厂 PowerShell 示例中找到其他 Azure 数据工厂 PowerShell 脚本示例。Additional Azure Data Factory PowerShell script samples can be found in the Azure Data Factory PowerShell samples.