快速入门:使用 REST API 创建 Azure 数据工厂和管道

适用于: Azure 数据工厂 Azure Synapse Analytics

Azure 数据工厂是基于云的数据集成服务,用于在云中创建数据驱动型工作流,以便协调和自动完成数据移动和数据转换。 使用 Azure 数据工厂,可以创建和计划数据驱动型工作流(称为管道),以便从不同的数据存储引入数据,通过各种计算服务(例如 Azure HDInsight Hadoop、Spark 和 Azure 机器学习)处理/转换数据,将输出数据发布到数据存储(例如 Azure Synapse Analytics),供商业智能 (BI) 应用程序使用。

此快速入门介绍如何使用 REST API 创建 Azure 数据工厂。 此数据工厂中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。

如果没有 Azure 订阅,请在开始前创建一个试用帐户

先决条件

备注

本文已经过更新,以便使用 Azure Az PowerShell 模块。 若要与 Azure 交互,建议使用的 PowerShell 模块是 Az PowerShell 模块。 若要开始使用 Az PowerShell 模块,请参阅安装 Azure PowerShell。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

  • Azure 订阅。 如果没有订阅,可以创建一个试用帐户。
  • Azure 存储帐户。 可以将 blob 存储用作 接收器 数据存储。 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。
  • 在 Blob 存储中创建一个 blob 容器,在该容器中创建一个输入 文件夹,并向该文件夹上传一些文件。 可以使用 Azure 存储资源管理器等工具连接到 Azure Blob 存储、创建 Blob 容器、上传输入文件,以及验证输出文件。
  • 安装 Azure PowerShell。 遵循如何安装和配置 Azure PowerShell 中的说明。 本快速入门使用 PowerShell 调用 REST API。
  • 按照 此说明在 Azure Active Directory 中创建应用程序。 记下要在后续步骤中使用的以下值:应用程序 IDclientSecrets租户 ID。 将应用程序分配到“参与者” 角色。

设置全局变量

  1. 启动 PowerShell。 在完成本快速入门之前,请将 Azure PowerShell 保持打开状态。 如果将它关闭再重新打开,则需要再次运行下述命令。

    运行以下命令并输入用于登录 Azure 门户的用户名和密码:

    Connect-AzAccount -Environment AzureChinaCloud
    

    运行以下命令查看此帐户的所有订阅:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"
    
  2. 将占位符替换为自己的值后,运行以下命令设置要在后续步骤中使用的全局变量。

    $tenantID = "<your tenant ID>"
    $appId = "<your application ID>"
    $clientSecrets = "<your clientSecrets for the application>"
    $subscriptionId = "<your subscription ID to create the factory>"
    $resourceGroupName = "<your resource group to create the factory>"
    $factoryName = "<specify the name of data factory to create. It must be globally unique.>"
    $apiVersion = "2018-06-01"
    

使用 Azure AD 进行身份验证

运行以下命令,使用 Azure Active Directory (AAD) 进行身份验证:

$AuthContext = [Microsoft.IdentityModel.Clients.ActiveDirectory.AuthenticationContext]"https://login.partner.microsoftonline.cn/${tenantId}"
$cred = New-Object -TypeName Microsoft.IdentityModel.Clients.ActiveDirectory.ClientCredential -ArgumentList ($appId, $clientSecrets)
$result = $AuthContext.AcquireTokenAsync("https://management.core.chinacloudapi.cn/", $cred).GetAwaiter().GetResult()
$authHeader = @{
'Content-Type'='application/json'
'Accept'='application/json'
'Authorization'=$result.CreateAuthorizationHeader()
}

创建数据工厂

运行以下命令以创建数据工厂:

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}?api-version=${apiVersion}"
$body = @"
{
    "name": "$factoryName",
    "location": "China East 2",
    "properties": {},
    "identity": {
        "type": "SystemAssigned"
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

请注意以下几点:

  • Azure 数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试。

    Data factory name "ADFv2QuickStartDataFactory" is not available.
    
  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。

下面是示例响应:

{  
    "name":"<dataFactoryName>",
    "identity":{  
        "type":"SystemAssigned",
        "principalId":"<service principal ID>",
        "tenantId":"<tenant ID>"
    },
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>",
    "type":"Microsoft.DataFactory/factories",
    "properties":{  
        "provisioningState":"Succeeded",
        "createTime":"2019-09-03T02:10:27.056273Z",
        "version":"2018-06-01"
    },
    "eTag":"\"0200c876-0000-0100-0000-5d6dcb930000\"",
    "location":"China East 2",
    "tags":{  

    }
}

创建链接服务

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在此快速入门中,只需创建一个同时作为复制源和接收器存储的 Azure 存储链接服务,在示例中名为“AzureStorageLinkedService”。

运行以下命令以创建名为 AzureStorageLinkedService 的链接服务:

在执行命令之前,将 <accountName> 和 <accountKey> 分别替换为 Azure 存储帐户的名称和密钥。

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/linkedservices/AzureStorageLinkedService?api-version=${apiVersion}"
$body = @"
{  
    "name":"AzureStorageLinkedService",
    "properties":{  
        "annotations":[  

        ],
        "type":"AzureBlobStorage",
        "typeProperties":{  
            "connectionString":"DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/linkedservices/AzureStorageLinkedService",
    "name":"AzureStorageLinkedService",
    "type":"Microsoft.DataFactory/factories/linkedservices",
    "properties":{  
        "annotations":[  

        ],
        "type":"AzureBlobStorage",
        "typeProperties":{  
            "connectionString":"DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
        }
    },
    "etag":"07011a57-0000-0100-0000-5d6e14a20000"
}

创建数据集

定义一个数据集来表示要从源复制到接收器的数据。 在此示例中,将创建两个数据集:InputDataset 和 OutputDataset。 它们引用在上一部分创建的 Azure 存储链接服务。 输入数据集表示输入文件夹中的源数据。 在输入数据集定义中,请指定包含源数据的 Blob 容器 (adftutorial)、文件夹 (input) 和文件 (emp.txt)。 输出数据集表示复制到目标的数据。 在输出数据集定义中,请指定要将数据复制到的 Blob 容器 (adftutorial)、文件夹 (output) 和文件。

创建 InputDataset

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/datasets/InputDataset?api-version=${apiVersion}"
$body = @"
{  
    "name":"InputDataset",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":{  
                "type":"AzureBlobStorageLocation",
                "fileName":"emp.txt",
                "folderPath":"input",
                "container":"adftutorial"
            }
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/datasets/InputDataset",
    "name":"InputDataset",
    "type":"Microsoft.DataFactory/factories/datasets",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":"@{type=AzureBlobStorageLocation; fileName=emp.txt; folderPath=input; container=adftutorial}"
        }
    },
    "etag":"07011c57-0000-0100-0000-5d6e14b40000"
}

创建 OutputDataset

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/datasets/OutputDataset?api-version=${apiVersion}"
$body = @"
{  
    "name":"OutputDataset",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":{  
                "type":"AzureBlobStorageLocation",
                "folderPath":"output",
                "container":"adftutorial"
            }
        }
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/datasets/OutputDataset",
    "name":"OutputDataset",
    "type":"Microsoft.DataFactory/factories/datasets",
    "properties":{  
        "linkedServiceName":{  
            "referenceName":"AzureStorageLinkedService",
            "type":"LinkedServiceReference"
        },
        "annotations":[  

        ],
        "type":"Binary",
        "typeProperties":{  
            "location":"@{type=AzureBlobStorageLocation; folderPath=output; container=adftutorial}"
        }
    },
    "etag":"07013257-0000-0100-0000-5d6e18920000"
}

创建管道

在本示例中,此管道包含一个“复制活动”。 复制活动引用在上一步中创建的“InputDataset”和“OutputDataset”作为输入和输出。

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelines/Adfv2QuickStartPipeline?api-version=${apiVersion}"
$body = @"
{
    "name": "Adfv2QuickStartPipeline",
    "properties": {
        "activities": [
            {
                "name": "CopyFromBlobToBlob",
                "type": "Copy",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "BinarySource",
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true
                        }
                    },
                    "sink": {
                        "type": "BinarySink",
                        "storeSettings": {
                            "type": "AzureBlobStorageWriteSettings"
                        }
                    },
                    "enableStaging": false
                },
                "inputs": [
                    {
                        "referenceName": "InputDataset",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "OutputDataset",
                        "type": "DatasetReference"
                    }
                ]
            }
        ],
        "annotations": []
    }
}
"@
$response = Invoke-RestMethod -Method PUT -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json

下面是示例输出:

{  
    "id":"/subscriptions/<subscriptionId>/resourceGroups/<resourceGroupName>/providers/Microsoft.DataFactory/factories/<dataFactoryName>/pipelines/Adfv2QuickStartPipeline",
    "name":"Adfv2QuickStartPipeline",
    "type":"Microsoft.DataFactory/factories/pipelines",
    "properties":{  
        "activities":[  
            "@{name=CopyFromBlobToBlob; type=Copy; dependsOn=System.Object[]; policy=; userProperties=System.Object[]; typeProperties=; inputs=System.Object[]; outputs=System.Object[]}"
        ],
        "annotations":[  

        ]
    },
    "etag":"07012057-0000-0100-0000-5d6e14c00000"
}

创建管道运行

在此步骤中,将触发管道运行。 响应正文中返回的管道运行 ID 将在后面的监视 API 中使用。

$request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelines/Adfv2QuickStartPipeline/createRun?api-version=${apiVersion}"
$response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader -Body $body
$response | ConvertTo-Json
$runId = $response.runId

下面是示例输出:

{  
    "runId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc"
}

监视管道

  1. 运行以下脚本来持续检查管道运行状态,直到它完成数据复制为止。

    $request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}?api-version=${apiVersion}"
    while ($True) {
        $response = Invoke-RestMethod -Method GET -Uri $request -Header $authHeader
        Write-Host  "Pipeline run status: " $response.Status -foregroundcolor "Yellow"
    
        if ( ($response.Status -eq "InProgress") -or ($response.Status -eq "Queued") ) {
            Start-Sleep -Seconds 15
        }
        else {
            $response | ConvertTo-Json
            break
        }
    }
    

    下面是示例输出:

    {  
        "runId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
        "debugRunId":null,
        "runGroupId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
        "pipelineName":"Adfv2QuickStartPipeline",
        "parameters":{  
    
        },
        "invokedBy":{  
            "id":"2bb3938176ee43439752475aa12b2251",
            "name":"Manual",
            "invokedByType":"Manual"
        },
        "runStart":"2019-09-03T07:22:47.0075159Z",
        "runEnd":"2019-09-03T07:22:57.8862692Z",
        "durationInMs":10878,
        "status":"Succeeded",
        "message":"",
        "lastUpdated":"2019-09-03T07:22:57.8862692Z",
        "annotations":[  
    
        ],
        "runDimension":{  
    
        },
        "isLatest":true
    }
    
  2. 运行以下脚本来检索复制活动运行详细信息,例如,读取/写入的数据的大小。

    $request = "https://management.chinacloudapi.cn/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}/queryActivityruns?api-version=${apiVersion}&startTime="+(Get-Date).ToString('yyyy-MM-dd')+"&endTime="+(Get-Date).AddDays(1).ToString('yyyy-MM-dd')+"&pipelineName=Adfv2QuickStartPipeline"
    $response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader
    $response | ConvertTo-Json
    

    下面是示例输出:

    {  
        "value":[  
            {  
                "activityRunEnd":"2019-09-03T07:22:56.6498704Z",
                "activityName":"CopyFromBlobToBlob",
                "activityRunStart":"2019-09-03T07:22:49.0719311Z",
                "activityType":"Copy",
                "durationInMs":7577,
                "retryAttempt":null,
                "error":"@{errorCode=; message=; failureType=; target=CopyFromBlobToBlob}",
                "activityRunId":"32951886-814a-4d6b-b82b-505936e227cc",
                "iterationHash":"",
                "input":"@{source=; sink=; enableStaging=False}",
                "linkedServiceName":"",
                "output":"@{dataRead=20; dataWritten=20; filesRead=1; filesWritten=1; sourcePeakConnections=1; sinkPeakConnections=1; copyDuration=4; throughput=0.01; errors=System.Object[]; effectiveIntegrationRuntime=DefaultIntegrationRuntime (China East 2); usedDataIntegrationUnits=4; usedParallelCopies=1; executionDetails=System.Object[]}",
                "userProperties":"",
                "pipelineName":"Adfv2QuickStartPipeline",
                "pipelineRunId":"04a2bb9a-71ea-4c31-b46e-75276b61bafc",
                "status":"Succeeded",
                "recoveryStatus":"None",
                "integrationRuntimeNames":"defaultintegrationruntime",
                "executionDetails":"@{integrationRuntime=System.Object[]}"
            }
        ]
    }
    

验证输出

使用 Azure 存储资源管理器检查文件是否已根据创建管道运行时的指定从“inputPath”复制到“outputPath”。

清理资源

可以通过两种方式清理在快速入门中创建的资源。 可以删除 Azure 资源组,其中包括资源组中的所有资源。 若要使其他资源保持原封不动,请仅删除在此教程中创建的数据工厂。

运行以下命令可以删除整个资源组:

Remove-AzResourceGroup -ResourceGroupName $resourcegroupname

运行以下命令可以仅删除数据工厂:

Remove-AzDataFactoryV2 -Name "<NameOfYourDataFactory>" -ResourceGroupName "<NameOfResourceGroup>"

后续步骤

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。 完成相关教程来了解如何在更多方案中使用数据工厂。