使用 PowerShell 创建用于复制云端数据的数据工厂管道Use PowerShell to create a data factory pipeline to copy data in the cloud

本 PowerShell 脚本示例在 Azure 数据工厂中创建管道,该管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。This sample PowerShell script creates a pipeline in Azure Data Factory that copies data from one location to another location in an Azure Blob Storage.

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

本示例需要 Azure PowerShell。This sample requires Azure PowerShell. 运行 Get-Module -ListAvailable Az 即可查找版本。Run Get-Module -ListAvailable Az to find the version. 如果需要进行安装或升级,请参阅安装 Azure PowerShell 模块If you need to install or upgrade, see Install Azure PowerShell module.

运行 Connect-AzAccount -Environment AzureChinaCloud cmdlet 以连接到 Azure 中国。Run the Connect-AzAccount -Environment AzureChinaCloud cmdlet to connect to Azure China.

先决条件Prerequisites

  • Azure 存储帐户Azure Storage account. 可以将 blob 存储同时用作接收器数据存储。You use the blob storage as both the source and sink data stores. 如果没有 Azure 存储帐户,请参阅创建存储帐户创建一个。If you don't have an Azure storage account, see the Create a storage account on creating one.
  • 在 Blob 存储中创建一个 blob 容器,在该容器中创建一个输入文件夹,并向该文件夹上传一些文件。Create a blob container in Blob Storage, create an input folder in the container, and upload some files to the folder. 可以使用 Azure 存储资源管理器等工具连接到 Azure Blob 存储、创建 Blob 容器、上传输入文件,以及验证输出文件。You can use tools such as Azure Storage explorer to connect to Azure Blob storage, create a blob container, upload input file, and verify the output file.

示例脚本Sample script

重要

此脚本在硬盘驱动器上的 c:\ 文件夹中创建 JSON 文件,用于定义数据工厂实体(链接服务、数据集和管道)。This script creates JSON files that define Data Factory entities (linked service, dataset, and pipeline) on your hard drive in the c:\ folder.

# Set variables with your own values
$resourceGroupName = "<Azure resource group name>"
$dataFactoryName = "<Data factory name>" # must be globally unquie
$dataFactoryRegion = "China East 2" 
$storageAccountName = "<Az.Storage account name>"
$storageAccountKey = "<Az.Storage account key>"
$sourceBlobPath = "<Azure blob container name>/<Azure blob input folder name>" # example: adftutorial/input
$sinkBlobPath = "<Azure blob container name>/<Azure blob output folder name>" # example: adftutorial/output
$pipelineName = "CopyPipeline"

# Create a resource group
New-AzResourceGroup -Name $resourceGroupName -Location $dataFactoryRegion

# Create a data factory
$df = Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $dataFactoryRegion -Name $dataFactoryName 

# Create an Az.Storage linked service in the data factory

## JSON definition of the linked service. 
$storageLinkedServiceDefinition = @"
{
    "name": "AzureStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": {
                "value": "DefaultEndpointsProtocol=https;AccountName=$storageAccountName;AccountKey=$storageAccountKey;EndpointSuffix=core.chinacloudapi.cn",
                "type": "SecureString"
            }
        }
    }
}
"@

## IMPORTANT: stores the JSON definition in a file that will be used by the Set-AzDataFactoryV2LinkedService command. 
$storageLinkedServiceDefinition | Out-File ./StorageLinkedService.json

## Creates a linked service in the data factory
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ./StorageLinkedService.json

# Create an Azure Blob dataset in the data factory

## JSON definition of the dataset
$datasetDefiniton = @"
{
    "name": "BlobDataset",
    "properties": {
        "type": "AzureBlob",
        "typeProperties": {
            "folderPath": {
                "value": "@{dataset().path}",
                "type": "Expression"
            }
        },
        "linkedServiceName": {
            "referenceName": "AzureStorageLinkedService",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "path": {
                "type": "String"
            }
        }
    }
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryV2Dataset command. 
$datasetDefiniton | Out-File ./BlobDataset.json

## Create a dataset in the data factory
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "BlobDataset" -File "./BlobDataset.json"

# Create a pipeline in the data factory

## JSON definition of the pipeline
$pipelineDefinition = @"
{
    "name": "$pipelineName",
    "properties": {
        "activities": [
            {
                "name": "CopyFromBlobToBlob",
                "type": "Copy",
                "inputs": [
                    {
                        "referenceName": "BlobDataset",
                        "parameters": {
                            "path": "@pipeline().parameters.inputPath"
                        },
                    "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "BlobDataset",
                        "parameters": {
                            "path": "@pipeline().parameters.outputPath"
                        },
                        "type": "DatasetReference"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                }
            }
        ],
        "parameters": {
            "inputPath": {
                "type": "String"
            },
            "outputPath": {
                "type": "String"
            }
        }
    }
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryV2Pipeline command. 
$pipelineDefinition | Out-File ./CopyPipeline.json

## Create a pipeline in the data factory
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "./CopyPipeline.json"

# Create a pipeline run 

## JSON definition for pipeline parameters
$pipelineParameters = @"
{
    "inputPath": "$sourceBlobPath",
    "outputPath": "$sinkBlobPath"
}
"@

## IMPORTANT: store the JSON definition in a file that will be used by the Invoke-AzDataFactoryV2Pipeline command. 
$pipelineParameters | Out-File ./PipelineParameters.json

# Create a pipeline run by using parameters
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName -ParameterFile ./PipelineParameters.json

# Check the pipeline run status until it finishes the copy operation
while ($True) {
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)

    if (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
        Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        Start-Sleep -Seconds 30
    }
    else {
        Write-Host "Pipeline '$pipelineName' run finished. Result:" -foregroundcolor "Yellow"
        $result
        break
    }
}

# Get the activity run details 
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName `
        -PipelineRunId $runId `
        -RunStartedAfter (Get-Date).AddMinutes(-10) `
        -RunStartedBefore (Get-Date).AddMinutes(10) `
        -ErrorAction Stop

    $result

    if ($result.Status -eq "Succeeded") {`
        $result.Output -join "`r`n"`
    }`
    else {`
        $result.Error -join "`r`n"`
    }

# To remove the data factory from the resource gorup
# Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
# 
# To remove the whole resource group
# Remove-AzResourceGroup  -Name $resourceGroupName

清理部署Clean up deployment

运行示例脚本后,可以使用以下命令删除资源组以及与其关联的所有资源:After you run the sample script, you can use the following command to remove the resource group and all resources associated with it:

Remove-AzResourceGroup -ResourceGroupName $resourceGroupName

若要从资源组中删除数据工厂,请运行以下命令:To remove the data factory from the resource group, run the following command:

Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName

脚本说明Script explanation

此脚本使用以下命令:This script uses the following commands:

命令Command 注释Notes
New-AzResourceGroupNew-AzResourceGroup 创建用于存储所有资源的资源组。Creates a resource group in which all resources are stored.
Set-AzDataFactoryV2Set-AzDataFactoryV2 创建数据工厂。Create a data factory.
Set-AzDataFactoryV2LinkedServiceSet-AzDataFactoryV2LinkedService 在数据工厂中创建链接服务。Creates a linked service in the data factory. 链接服务可将数据存储或计算链接到数据工厂。A linked service links a data store or compute to a data factory.
Set-AzDataFactoryV2DatasetSet-AzDataFactoryV2Dataset 在数据工厂中创建数据集。Creates a dataset in the data factory. 数据集表示管道中活动的输入/输出。A dataset represents input/output for an activity in a pipeline.
Set-AzDataFactoryV2PipelineSet-AzDataFactoryV2Pipeline 在数据工厂中创建管道。Creates a pipeline in the data factory. 一个管道包含一个或多个执行某项操作的活动。A pipeline contains one or more activities that performs a certain operation. 在此管道中,复制活动在 Azure Blob 存储中将数据从一个位置复制到另一个位置。In this pipeline, a copy activity copies data from one location to another location in an Azure Blob Storage.
Invoke-AzDataFactoryV2PipelineInvoke-AzDataFactoryV2Pipeline 为管道创建运行。Creates a run for the pipeline. 换而言之,就是运行管道。In other words, runs the pipeline.
Get-AzDataFactoryV2ActivityRunGet-AzDataFactoryV2ActivityRun 获取管道中活动的运行(活动运行)的相关详细信息。Gets details about the run of the activity (activity run) in the pipeline.
Remove-AzResourceGroupRemove-AzResourceGroup 删除资源组,包括所有嵌套的资源。Deletes a resource group including all nested resources.

后续步骤Next steps

有关 Azure PowerShell 的详细信息,请参阅 Azure PowerShell 文档For more information on the Azure PowerShell, see Azure PowerShell documentation.

可以在 Azure 数据工厂 PowerShell 示例中找到其他 Azure 数据工厂 PowerShell 脚本示例。Additional Azure Data Factory PowerShell script samples can be found in the Azure Data Factory PowerShell samples.