快速入门:使用 REST API 创建 Azure 数据工厂和管道Quickstart: Create an Azure data factory and pipeline by using the REST API

适用于:是 Azure 数据工厂否 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory noAzure Synapse Analytics (Preview)

Azure 数据工厂是基于云的数据集成服务,用于在云中创建数据驱动型工作流,以便协调和自动完成数据移动和数据转换。Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows in the cloud for orchestrating and automating data movement and data transformation. 使用 Azure 数据工厂,可以创建和计划数据驱动型工作流(称为管道),以便从不同的数据存储引入数据,通过各种计算服务(例如 Azure HDInsight Hadoop、Spark)处理/转换数据,将输出数据发布到数据存储(例如 Azure SQL 数据仓库),供商业智能 (BI) 应用程序使用。Using Azure Data Factory, you can create and schedule data-driven workflows (called pipelines) that can ingest data from disparate data stores, process/transform the data by using compute services such as Azure HDInsight Hadoop, Spark, and publish output data to data stores such as Azure SQL Data Warehouse for business intelligence (BI) applications to consume.

此快速入门介绍如何使用 REST API 创建 Azure 数据工厂。This quickstart describes how to use REST API to create an Azure data factory. 此数据工厂中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。The pipeline in this data factory copies data from one location to another location in an Azure blob storage.

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户。If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

Note

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

  • Azure 订阅Azure subscription. 如果没有订阅,可以创建一个 1 元试用帐户。If you don't have a subscription, you can create a 1rmb trial account.
  • Azure 存储帐户Azure Storage account. 可以将 blob 存储用作接收器数据存储。You use the blob storage as source and sink data store. 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。If you don't have an Azure storage account, see the Create a storage account article for steps to create one.
  • 在 Blob 存储中创建一个 blob 容器,在该容器中创建一个输入文件夹,并向该文件夹上传一些文件。Create a blob container in Blob Storage, create an input folder in the container, and upload some files to the folder. 可以使用 Azure 存储资源管理器等工具连接到 Azure Blob 存储、创建 Blob 容器、上传输入文件,以及验证输出文件。You can use tools such as Azure Storage explorer to connect to Azure Blob storage, create a blob container, upload input file, and verify the output file.
  • 安装 Azure PowerShellInstall Azure PowerShell. 遵循如何安装和配置 Azure PowerShell 中的说明。Follow the instructions in How to install and configure Azure PowerShell. 本快速入门使用 PowerShell 调用 REST API。This quickstart uses PowerShell to invoke REST API calls.
  • 按照此说明在 Azure Active Directory 中创建应用程序Create an application in Azure Active Directory following this instruction. 记下要在后续步骤中使用的以下值:应用程序 IDclientSecrets租户 IDMake note of the following values that you use in later steps: application ID, clientSecrets, and tenant ID. 将应用程序分配到“参与者” 角色。Assign application to "Contributor" role.

设置全局变量Set global variables

  1. 启动 PowerShellLaunch PowerShell. 在完成本快速入门之前,请将 Azure PowerShell 保持打开状态。Keep Azure PowerShell open until the end of this quickstart. 如果将它关闭再重新打开,则需要再次运行下述命令。If you close and reopen, you need to run the commands again.

    运行以下命令并输入用于登录 Azure 门户的用户名和密码:Run the following command, and enter the user name and password that you use to sign in to the Azure portal:

    Connect-AzAccount -Environment AzureChinaCloud
    

    运行以下命令查看此帐户的所有订阅:Run the following command to view all the subscriptions for this account:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。Run the following command to select the subscription that you want to work with. 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:Replace SubscriptionId with the ID of your Azure subscription:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"
    
  2. 将占位符替换为自己的值后,运行以下命令设置要在后续步骤中使用的全局变量。Run the following commands after replacing the places-holders with your own values, to set global variables to be used in later steps.

    $tenantID = "<your tenant ID>"
    $appId = "<your application ID>"
    $clientSecrets = "<your clientSecrets for the application>"
    $subscriptionId = "<your subscription ID to create the factory>"
    $resourceGroupName = "<your resource group to create the factory>"
    $factoryName = "<specify the name of data factory to create. It must be globally unique.>"
    $apiVersion = "2018-06-01"
    

使用 Azure AD 进行身份验证Authenticate with Azure AD

运行以下命令,使用 Azure Active Directory (AAD) 进行身份验证:Run the following commands to authenticate with Azure Active Directory (AAD):

$AuthContext = [Microsoft.IdentityModel.Clients.ActiveDirectory.AuthenticationContext]"https://login.partner.microsoftonline.cn/${tenantId}"
$cred = New-Object -TypeName Microsoft.IdentityModel.Clients.ActiveDirectory.ClientCredential -ArgumentList ($appId, $clientSecrets)
$result = $AuthContext.AcquireTokenAsync("https://management.core.chinacloudapi.cn/", $cred).GetAwaiter().GetResult()
$authHeader = @{
'Content-Type'='application/json'
'Accept'='application/json'
'Authorization'=$result.CreateAuthorizationHeader()
}

创建数据工厂Create a data factory

运行以下命令以创建数据工厂:Run the following commands to create a data factory:

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}?api-version=${apiVersion}"
$body = @"
{
    "name": "$dataFactoryName",
    "location": "China East 2",
    "properties": {},
    "identity": {
        "type": "SystemAssigned"
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

请注意以下几点:Note the following points:

  • Azure 数据工厂的名称必须全局唯一。The name of the Azure data factory must be globally unique. 如果收到以下错误,请更改名称并重试。If you receive the following error, change the name and try again.

    Data factory name "ADFv2QuickStartDataFactory" is not available.
    
  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :可用产品(按区域)For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

下面是示例响应:Here is the sample response:

{  
    "name":"<dataFactoryName>",
    "identity":{  
        "type":"SystemAssigned",
        "principalId":"<service principal ID>",
        "tenantId":"<tenant ID>"
    },
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>",
    "type":"Microsoft.DataFactory/factories",
    "properties":{  
        "provisioningState":"Succeeded",
        "createTime":"2019-09-03T02:10:27.056273Z",
        "version":"2018-06-01"
    },
    "eTag":"\"0200c876-0000-0100-0000-5d6dcb930000\"",
    "location":"China East 2",
    "tags":{  

    }
}

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在此快速入门中,只需创建一个同时作为复制源和接收器存储的 Azure 存储链接服务,在示例中名为“AzureStorageLinkedService”。In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample.

运行以下命令以创建名为 AzureStorageLinkedService 的链接服务:Run the following commands to create a linked service named AzureStorageLinkedService:

在执行命令之前,将 <accountName> 和 <accountKey> 分别替换为 Azure 存储帐户的名称和密钥。Replace <accountName> and <accountKey> with name and key of your Azure storage account before executing the commands.

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/linkedservices/AzureStorageLinkedService?api-version=${apiVersion}"
$body = @"
{  
    "name":"AzureStorageLinkedService",
    "properties":{  
        "annotations":[  

        ],
        "type":"AzureBlobStorage",
        "typeProperties":{  
            "connectionString":"DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:Here is the sample output:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/linkedservices/AzureStorageLinkedService",
    "name":"AzureStorageLinkedService",
    "type":"Microsoft.DataFactory/factories/linkedservices",
    "properties":{  
        "annotations":[  

        ],
        "type":"AzureBlobStorage",
        "typeProperties":{  
            "connectionString":"DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
        }
    },
    "etag":"07011a57-0000-0100-0000-5d6e14a20000"
}

创建数据集Create datasets

定义一个数据集来表示要从源复制到接收器的数据。You define a dataset that represents the data to copy from a source to a sink. 在此示例中,将创建两个数据集:InputDataset 和 OutputDataset。In this example, you create two datasets: InputDataset and OutputDataset. 它们引用在上一部分创建的 Azure 存储链接服务。They refer to the Azure Storage linked service that you created in the previous section. 输入数据集表示输入文件夹中的源数据。The input dataset represents the source data in the input folder. 在输入数据集定义中,请指定包含源数据的 Blob 容器 (adftutorial)、文件夹 (input) 和文件 (emp.txt)。In the input dataset definition, you specify the blob container (adftutorial), the folder (input), and the file (emp.txt) that contain the source data. 输出数据集表示复制到目标的数据。The output dataset represents the data that's copied to the destination. 在输出数据集定义中,请指定要将数据复制到的 Blob 容器 (adftutorial)、文件夹 (output) 和文件。In the output dataset definition, you specify the blob container (adftutorial), the folder (output), and the file to which the data is copied.

创建 InputDatasetCreate InputDataset

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/datasets/InputDataset?api-version=${apiVersion}"
$body = @"
{  
    "name":"InputDataset",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":{  
                "type":"AzureBlobStorageLocation",
                "fileName":"emp.txt",
                "folderPath":"input",
                "container":"adftutorial"
            }
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:Here is the sample output:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/datasets/InputDataset",
    "name":"InputDataset",
    "type":"Microsoft.DataFactory/factories/datasets",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":"@{type=AzureBlobStorageLocation; fileName=emp.txt; folderPath=input; container=adftutorial}"
        }
    },
    "etag":"07011c57-0000-0100-0000-5d6e14b40000"
}

创建 OutputDatasetCreate OutputDataset

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/datasets/OutputDataset?api-version=${apiVersion}"
$body = @"
{  
    "name":"OutputDataset",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":{  
                "type":"AzureBlobStorageLocation",
                "folderPath":"output",
                "container":"adftutorial"
            }
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:Here is the sample output:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/datasets/OutputDataset",
    "name":"OutputDataset",
    "type":"Microsoft.DataFactory/factories/datasets",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":"@{type=AzureBlobStorageLocation; folderPath=output; container=adftutorial}"
        }
    },
    "etag":"07013257-0000-0100-0000-5d6e18920000"
}

创建管道Create pipeline

在此示例中,此管道包含一个活动并采用两个参数 - 输入 blob 路径和输出 blob 路径。In this example, this pipeline contains one activity and takes two parameters - input blob path and output blob path. 这些参数的值是在触发/运行管道时设置的。The values for these parameters are set when the pipeline is triggered/run. 复制活动引用在上一步中创建的同一 blob 数据集作为输入和输出。The copy activity refers to the same blob dataset created in the previous step as input and output. 当该数据集用作输入数据集时,即指定了输入路径。When the dataset is used as an input dataset, input path is specified. 并且,当该数据集用作输出数据集时,即指定了输出路径。And, when the dataset is used as an output dataset, the output path is specified.

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${dataFactoryName}/pipelines/Adfv2QuickStartPipeline?api-version=${apiVersion}"
$body = @"
{
    "name": "Adfv2QuickStartPipeline",
    "properties": {
        "activities": [
            {
                "name": "CopyFromBlobToBlob",
                "type": "Copy",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "BinarySource",
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true
                        }
                    },
                    "sink": {
                        "type": "BinarySink",
                        "storeSettings": {
                            "type": "AzureBlobStorageWriteSettings"
                        }
                    },
                    "enableStaging": false
                },
                "inputs": [
                    {
                        "referenceName": "InputDataset",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "OutputDataset",
                        "type": "DatasetReference"
                    }
                ]
            }
        ],
        "annotations": []
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:Here is the sample output:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/pipelines/Adfv2QuickStartPipeline",
    "name":"Adfv2QuickStartPipeline",
    "type":"Microsoft.DataFactory/factories/pipelines",
    "properties":{  
        "activities":[  
            "@{name=CopyFromBlobToBlob; type=Copy; dependsOn=System.Object[]; policy=; userProperties=System.Object[]; typeProperties=; inputs=System.Object[]; outputs=System.Object[]}"
        ],
        "annotations":[  

        ]
    },
    "etag":"07012057-0000-0100-0000-5d6e14c00000"
}

创建管道运行Create pipeline run

在此步骤中,设置 inputPathoutputPath 参数的值(这些值是使用源和接收器 blob 路径的实际值在管道中指定的值),并触发管道运行。In this step, you set values of inputPath and outputPath parameters specified in pipeline with the actual values of source and sink blob paths, and trigger a pipeline run. 响应正文中返回的管道运行 ID 将在后面的监视 API 中使用。The pipeline run ID returned in the response body is used in later monitoring API.

在保存该文件之前,请将 inputPathoutputPath 的值替换为要从中复制数据和要将数据复制到的源和接收器 blob 路径。Replace value of inputPath and outputPath with your source and sink blob path to copy data from and to before saving the file.

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelines/Adfv2QuickStartPipeline/createRun?api-version=${apiVersion}"
$response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json
$runId = $response.runId

下面是示例输出:Here is the sample output:

{  
    "runId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc"
}

监视管道Monitor pipeline

  1. 运行以下脚本来持续检查管道运行状态,直到它完成数据复制为止。Run the following script to continuously check the pipeline run status until it finishes copying the data.

    $request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}?api-version=${apiVersion}"
    while ($True) {
        $response = Invoke-RestMethod -Method GET -Uri $request -Header $authHeader
        Write-Host  "Pipeline run status: " $response.Status -foregroundcolor "Yellow"
    
        if ($response.Status -eq "InProgress") {
            Start-Sleep -Seconds 15
        }
        else {
            $response | ConvertTo-Json
            break
        }
    }
    

    下面是示例输出:Here is the sample output:

    {  
        "runId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
        "debugRunId":null,
        "runGroupId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
        "pipelineName":"Adfv2QuickStartPipeline",
        "parameters":{  
    
        },
        "invokedBy":{  
            "id":"2bb3938176ee43439752475aa12b2251",
            "name":"Manual",
            "invokedByType":"Manual"
        },
        "runStart":"2019-09-03T07:22:47.0075159Z",
        "runEnd":"2019-09-03T07:22:57.8862692Z",
        "durationInMs":10878,
        "status":"Succeeded",
        "message":"",
        "lastUpdated":"2019-09-03T07:22:57.8862692Z",
        "annotations":[  
    
        ],
        "runDimension":{  
    
        },
        "isLatest":true
    }
    
  2. 运行以下脚本来检索复制活动运行详细信息,例如,读取/写入的数据的大小。Run the following script to retrieve copy activity run details, for example, size of the data read/written.

    $request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}/queryActivityruns?api-version=${apiVersion}&startTime="+(Get-Date).ToString('yyyy-MM-dd')+"&endTime="+(Get-Date).AddDays(1).ToString('yyyy-MM-dd')+"&pipelineName=Adfv2QuickStartPipeline"
    $response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader
    $response | ConvertTo-Json
    

    下面是示例输出:Here is the sample output:

    {  
        "value":[  
            {  
                "activityRunEnd":"2019-09-03T07:22:56.6498704Z",
                "activityName":"CopyFromBlobToBlob",
                "activityRunStart":"2019-09-03T07:22:49.0719311Z",
                "activityType":"Copy",
                "durationInMs":7577,
                "retryAttempt":null,
                "error":"@{errorCode=; message=; failureType=; target=CopyFromBlobToBlob}",
                "activityRunId":"32951886-814a-4d6b-b82b-505936e227cc",
                "iterationHash":"",
                "input":"@{source=; sink=; enableStaging=False}",
                "linkedServiceName":"",
                "output":"@{dataRead=20; dataWritten=20; filesRead=1; filesWritten=1; sourcePeakConnections=1; sinkPeakConnections=1; copyDuration=4; throughput=0.01; errors=System.Object[]; effectiveIntegrationRuntime=DefaultIntegrationRuntime (China East 2); usedDataIntegrationUnits=4; usedParallelCopies=1; executionDetails=System.Object[]}",
                "userProperties":"",
                "pipelineName":"Adfv2QuickStartPipeline",
                "pipelineRunId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
                "status":"Succeeded",
                "recoveryStatus":"None",
                "integrationRuntimeNames":"defaultintegrationruntime",
                "executionDetails":"@{integrationRuntime=System.Object[]}"
            }
        ]
    }
    

验证输出Verify the output

使用 Azure 存储资源管理器检查文件是否已根据创建管道运行时的指定从“inputPath”复制到“outputPath”。Use Azure Storage explorer to check the file is copied to "outputPath" from "inputPath" as you specified when creating a pipeline run.

清理资源Clean up resources

可以通过两种方式清理在快速入门中创建的资源。You can clean up the resources that you created in the Quickstart in two ways. 可以删除 Azure 资源组,其中包括资源组中的所有资源。You can delete the Azure resource group, which includes all the resources in the resource group. 若要使其他资源保持原封不动,请仅删除在此教程中创建的数据工厂。If you want to keep the other resources intact, delete only the data factory you created in this tutorial.

运行以下命令可以删除整个资源组:Run the following command to delete the entire resource group:

Remove-AzResourceGroup -ResourceGroupName $resourcegroupname

运行以下命令可以仅删除数据工厂:Run the following command to delete only the data factory:

Remove-AzDataFactoryV2 -Name "<NameOfYourDataFactory>" -ResourceGroupName "<NameOfResourceGroup>"

后续步骤Next steps

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。The pipeline in this sample copies data from one location to another location in an Azure blob storage. 完成相关教程来了解如何在更多方案中使用数据工厂。Go through the tutorials to learn about using Data Factory in more scenarios.