PowerShell script - transform data in cloud using Azure Data Factory
This sample PowerShell script creates a pipeline that transforms data in the cloud by running Spark program on an Azure HDInsight Spark cluster.
Note
We recommend that you use the Azure Az PowerShell module to interact with Azure. See Install Azure PowerShell to get started. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.
This sample requires Azure PowerShell. Run Get-Module -ListAvailable Az
to find the version.
If you need to install or upgrade, see Install Azure PowerShell module.
Run the Connect-AzAccount -Environment AzureChinaCloud cmdlet to connect to Azure operated by 21Vianet.
Prerequisites
- Azure Storage account. Create a Python script and an input file, and upload them to the Azure storage. The output from the spark program is stored in this storage account. The on-demand Spark cluster uses the same storage account as its primary storage.
Upload Python script to your Blob Storage account
Create a Python file named WordCount_Spark.py with the following content:
import sys from operator import add from pyspark.sql import SparkSession def main(): spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.chinacloudapi.cn/spark/outputfiles/wordcount") spark.stop() if __name__ == "__main__": main()
Replace <storageAccountName> with the name of your Azure Storage account. Then, save the file.
In your Azure Blob Storage, create a container named adftutorial if it does not exist.
Create a folder named spark.
Create a subfolder named script under spark folder.
Upload the WordCount_Spark.py file to the script subfolder.
Upload the input file
- Create a file named minecraftstory.txt with some text. The spark program counts the number of words in this text.
- Create a subfolder named
inputfiles
in thespark
folder of the blob container. - Upload the
minecraftstory.txt
to theinputfiles
subfolder.
Sample script
Important
This script creates JSON files that define Data Factory entities (linked service, dataset, and pipeline) on your hard drive in the c:\ folder.
powershell Set-ExecutionPolicy Unrestricted -Scope CurrentUser
# Set variables with your own values
$resourceGroupName = "<Azure resource group name>"
$dataFactoryName = "<Data factory name. Must be globally unique.>"
$dataFactoryRegion = "China East 2"
$storageAccountName = "<Az.Storage account name> "
$storageAccountKey = "<Az.Storage account key>"
$subscriptionID = "<Azure subscription ID>"
$tenantID = "<tenant ID>"
$servicePrincipalID = "<Active directory service principal ID>"
$servicePrincipalKey = "<Active directory service principal key>"
$pipelineName = "SparkTransformPipeline"
# Create a resource group
New-AzResourceGroup -Name $resourceGroupName -Location $dataFactoryRegion
# Create a data factory
$df = Set-AzDataFactory -ResourceGroupName $resourceGroupName -Location $dataFactoryRegion -Name $dataFactoryName
# Create an Az.Storage linked service in the data factory
## JSON definition of the linked service.
$storageLinkedServiceDefinition = @"
{
"name": "AzureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"value": "DefaultEndpointsProtocol=https;AccountName=$storageAccountName;AccountKey=$storageAccountKey;EndpointSuffix=core.chinacloudapi.cn",
"type": "SecureString"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$storageLinkedServiceDefinition | Out-File c:\AzureStorageLinkedService.json
## Creates an Az.Storage linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File c:\AzureStorageLinkedService.json
# Create on-demand Spark linked service in the data factory
## JSON definition of the linked service.
$sparkLinkedServiceDefinition = @"
{
"name": "OnDemandSparkLinkedService",
"properties": {
"type": "HDInsightOnDemand",
"typeProperties": {
"clusterSize": 2,
"clusterType": "spark",
"timeToLive": "00:15:00",
"hostSubscriptionId": "$subscriptionID",
"servicePrincipalId": "$servicePrincipalID",
"servicePrincipalKey": {
"value": "$servicePrincipalKey",
"type": "SecureString"
},
"tenant": "$tenantID",
"clusterResourceGroup": "$resourceGroupName",
"version": "3.6",
"osType": "Linux",
"clusterNamePrefix":"ADFSparkSample",
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$sparkLinkedServiceDefinition | Out-File c:\OnDemandSparkLinkedService.json
# Creates an on-demand Spark linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "OnDemandSparkLinkedService" -File "C:\OnDemandSparkLinkedService.json"
# Create a pipeline in the data factory
## JSON definition of the pipeline
$pipelineDefinition = @"
{
"name": "SparkTransformPipeline",
"properties": {
"activities": [
{
"name": "MySparkActivity",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "OnDemandSparkLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"rootPath": "adftutorial/spark",
"entryFilePath": "script/WordCount_Spark.py",
"getDebugInfo": "Failure",
"sparkJobLinkedService": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
]
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryPipeline command.
$pipelineDefinition | Out-File c:\SparkTransformPipeline.json
## Create a pipeline with Spark Activity in the data factory
Set-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SparkTransformPipeline" -File "c:\SparkTransformPipeline.json"
# Create a pipeline run
## JSON definition for dummy pipeline parameters
$pipelineParameters = @"
{
"dummy": "b"
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Invoke-AzDataFactoryPipeline command.
$pipelineParameters | Out-File c:\PipelineParameters.json
# Create a pipeline run by using parameters
$runId = Invoke-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName -ParameterFile c:\PipelineParameters.json
# Check the pipeline run status until it finishes
Start-Sleep -Seconds 30
while ($True) {
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
Start-Sleep -Seconds 300
}
else {
Write-Host "Pipeline $pipelineName run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
}
# Get the activity run details
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName `
-PipelineRunId $runId `
-RunStartedAfter (Get-Date).AddMinutes(-30) `
-RunStartedBefore (Get-Date).AddMinutes(30) `
-ErrorAction Stop
$result
if ($result.Status -eq "Succeeded") {`
$result.Output -join "`r`n"`
}`
else {`
$result.Error -join "`r`n"`
}
# To remove the data factory from the resource gorup
# Remove-AzDataFactory -Name $dataFactoryName -ResourceGroupName $resourceGroupName
#
# To remove the whole resource group
# Remove-AzResourceGroup -Name $resourceGroupName
Clean up deployment
After you run the sample script, you can use the following command to remove the resource group and all resources associated with it:
Remove-AzResourceGroup -ResourceGroupName $resourceGroupName
To remove the data factory from the resource group, run the following command:
Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
Script explanation
This script uses the following commands:
Command | Notes |
---|---|
New-AzResourceGroup | Creates a resource group in which all resources are stored. |
Set-AzDataFactoryV2 | Create a data factory. |
Set-AzDataFactoryV2LinkedService | Creates a linked service in the data factory. A linked service links a data store or compute to a data factory. |
Set-AzDataFactoryV2Pipeline | Creates a pipeline in the data factory. A pipeline contains one or more activities that performs a certain operation. In this pipeline, a spark activity transforms data by running a program on an Azure HDInsight Spark cluster. |
Invoke-AzDataFactoryV2Pipeline | Creates a run for the pipeline. In other words, runs the pipeline. |
Get-AzDataFactoryV2ActivityRun | Gets details about the run of the activity (activity run) in the pipeline. |
Remove-AzResourceGroup | Deletes a resource group including all nested resources. |
Related content
For more information on the Azure PowerShell, see Azure PowerShell documentation.
Additional Azure Data Factory PowerShell script samples can be found in the Azure Data Factory PowerShell samples.