快速入门:使用 .NET SDK 创建数据工厂和管道

适用于: Azure 数据工厂

此快速入门介绍了如何使用 .NET SDK 创建一个 Azure 数据工厂。 在此数据工厂中创建的管道会将数据从 Azure Blob 存储中的一个文件夹 复制 到另一个文件夹。 有关如何使用 Azure 数据工厂 转换 数据的教程,请参阅 教程:使用 Spark 转换数据

备注

本文不提供数据工厂服务的详细介绍。 有关 Azure 数据工厂服务的介绍,请参阅 Azure 数据工厂简介

先决条件

Azure 订阅

如果没有 Azure 订阅,请在开始前创建一个试用订阅

Azure 角色

若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。 若要查看你在订阅中拥有的权限,请转到 Azure 门户,在右上角选择你的用户名,然后选择“...” 图标以显示更多选项,然后选择“我的权限” 。 如果可以访问多个订阅,请选择相应的订阅。

若要为数据工厂创建和管理子资源(包括数据集、链接服务、管道、触发器和集成运行时),以下要求适用:

  • 若要在 Azure 门户中创建和管理子资源,你必须属于资源组级别或更高级别的 数据工厂参与者 角色。
  • 若要使用 PowerShell 或 SDK 创建和管理子资源,资源级别或更高级别的 参与者 角色已足够。

有关如何将用户添加到角色的示例说明,请参阅添加角色一文。

有关详细信息,请参阅以下文章:

Azure 存储帐户

在本快速入门中,使用常规用途的 Azure 存储帐户(具体的说就是 Blob 存储)作为源 和目标 数据存储。 如果没有常规用途的 Azure 存储帐户,请参阅创建存储帐户创建一个。

获取存储帐户名称

在本快速入门中,将需要 Azure 存储帐户的名称。 以下过程提供的步骤用于获取存储帐户的名称:

  1. 在 Web 浏览器中,转到 Azure 门户并使用你的 Azure 用户名和密码登录。
  2. 从 Azure 门户菜单中,选择“所有服务”,然后选择“存储” > “存储帐户” 。 此外,也可以在任何页面中搜索和选择“存储帐户” 。
  3. 在“存储帐户”页中,筛选你的存储帐户(如果需要),然后选择它 。

此外,也可以在任何页面中搜索和选择“存储帐户” 。

创建 Blob 容器

本部分介绍如何在 Azure Blob 存储中创建名为 adftutorial 的 Blob 容器。

  1. 在“存储帐户”页上,选择“概述” > “容器”。

  2. 在 <Account name> - “容器”页的工具栏中,选择“容器” 。

  3. 在“新建容器” 对话框中,输入 adftutorial 作为名称,然后选择“确定” 。 <Account name> - “容器”页已更新为在容器列表中包含“adftutorial” 。

    容器列表

为 Blob 容器添加输入文件夹和文件

在此部分中,在创建的容器中创建名为“input”的文件夹,再将示例文件上传到 input 文件夹。 在开始之前,打开文本编辑器(如记事本),并创建包含以下内容的名为“emp.txt”的文件 :

John, Doe
Jane, Doe

将此文件保存在 C:\ADFv2QuickStartPSH 文件夹中 。 (如果此文件夹不存在,则创建它。)然后返回到 Azure 门户并执行以下步骤:

  1. 在上次离开的 <Account name> - “容器”页中,选择已更新的容器列表中的“adftutorial” 。

    1. 如果关闭了窗口或转到其他页,请再次登录到 Azure 门户
    2. 从 Azure 门户菜单中,选择“所有服务”,然后选择“存储” > “存储帐户” 。 此外,也可以在任何页面中搜索和选择“存储帐户” 。
    3. 选择存储帐户,然后选择“容器” > “adftutorial” 。
  2. 在“adftutorial”容器页面的工具栏上,选择“上传” 。

  3. 在“上传 Blob”页中,选择“文件”框,然后浏览到 emp.txt 文件并进行选择 。

  4. 展开“高级”标题 。 此页现在显示如下内容:

    选择“高级...”链接

  5. 在“上传到文件夹”框中,输入“输入”。

  6. 选择“上传”按钮。 应该会在列表中看到 emp.txt 文件和上传状态。

  7. 选择“关闭”图标 (X) 以关闭“上传 Blob”页面 。

让“adftutorial”容器页面保持打开状态 。 在本快速入门结束时可以使用它来验证输出。

Visual Studio

本文中的演练使用 Visual Studio 2019。 Visual Studio 2013、2015 或 2017 的过程略有不同。

在 Azure Active Directory 中创建应用程序

从“如何:使用门户创建可访问资源的 Azure AD 应用程序和服务主体”中的各部分开始,按照说明执行以下任务:

  1. 创建 Azure Active Directory 应用程序中,创建一个应用程序来表示正在本教程中创建的 .NET 应用程序。 对于登录 URL,可以提供虚拟 URL,如本文中所示 (https://contoso.org/exampleapp)。
  2. 获取用于登录的值中,获取应用程序 ID 和租户 ID,并记下这些值,稍后要在本教程中使用它们 。
  3. 证书与机密中,获取身份验证密钥,并记此值,稍后要在本教程中使用它。
  4. 将应用程序分配给角色中,在订阅级别将应用程序分配到“参与者”角色,让该应用程序可以在订阅中创建数据工厂。

创建 Visual Studio 项目

接下来,在 Visual Studio 中创建 C# 控制台应用程序:

  1. 启动 Visual Studio
  2. 在“开始”窗口中,选择“创建新项目” > “控制台应用(.NET Framework)” 。 需要 .NET 4.5.2 或更高版本。
  3. 在“项目名称”中,输入 ADFv2QuickStart 。
  4. 选择“创建”来创建项目。

安装 NuGet 包

  1. 选择“工具” > “NuGet 包管理器” > “包管理器控制台”。

  2. 在“包管理器控制台”窗格中,运行以下命令来安装包。 有关详细信息,请参阅 Microsoft.Azure.Management.DataFactory NuGet 包

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

创建数据工厂客户端

  1. 打开 Program.cs,包括以下语句来添加对命名空间的引用。

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. 将以下代码添加到 Main 方法以设置变量。 将占位符替换为自己的值。 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库等)和计算资源(HDInsight 等)可以位于其他区域中。

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID where the data factory resides>";
    string resourceGroup = "<your resource group where the data factory resides>";
    string region = "<the location of your resource group>";
    string dataFactoryName = 
        "<specify the name of data factory to create. It must be globally unique.>";
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    // specify the container and input folder from which all files 
    // need to be copied to the output folder. 
    string inputBlobPath =
        "<path to existing blob(s) to copy data from, e.g. containername/inputdir>";
    //specify the contains and output folder where the files are copied
    string outputBlobPath =
        "<the blob path to copy data to, e.g. containername/outputdir>";
    
    // name of the Azure Storage linked service, blob dataset, and the pipeline
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string blobDatasetName = "BlobDataset";
    string pipelineName = "Adfv2QuickStartPipeline";
    
  3. Main 方法中添加用于创建 DataFactoryManagementClient 类的实例的以下代码。 将使用此对象创建数据工厂、链接服务、数据集以及管道。 还将使用此对象来监视管道运行详细信息。

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.chinacloudapi.cn/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.chinacloudapi.cn/", cc).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) {
        SubscriptionId = subscriptionId , BaseUri = new Uri("https://management.chinacloudapi.cn/")};
    

创建数据工厂

Main 方法中添加用于创建 数据工厂 的以下代码。

// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));

while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
       "PendingCreation")
{
    System.Threading.Thread.Sleep(1000);
}

创建链接服务

Main 方法中添加用于创建 Azure 存储链接服务 的以下代码。

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在此快速入门中,只需创建一个 Azure 存储链接服务,用于复制源和接收器存储。在示例中,此服务名为“AzureStorageLinkedService”。

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey +
            ";EndpointSuffix=core.chinacloudapi.cn")
    }
);
client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
    storageLinkedService, client.SerializationSettings));

创建数据集

Main 方法中添加用于创建 Azure blob 数据集 的以下代码。

定义一个数据集来表示要从源复制到接收器的数据。 在此示例中,此 Blob 数据集引用在上一步中创建的 Azure 存储链接服务。 此数据集采用一个参数,其值在使用此数据集的活动中设置。 该参数用来构造“folderPath”,该项指向数据的驻留/存储位置。

// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = new Expression { Value = "@{dataset().path}" },
        Parameters = new Dictionary<string, ParameterSpecification>
        {
            { "path", new ParameterSpecification { Type = ParameterType.String } }
        }
    }
);
client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));

创建管道

Main 方法中添加用于创建 包含复制活动的管道 的以下代码。

在此示例中,此管道包含一个活动并采用两个参数:输入 Blob 路径和输出 Blob 路径。 这些参数的值是在触发/运行管道时设置的。 复制活动引用在上一步中创建的同一 blob 数据集作为输入和输出。 当该数据集用作输入数据集时,即指定了输入路径。 并且,当该数据集用作输出数据集时,即指定了输出路径。

// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Parameters = new Dictionary<string, ParameterSpecification>
    {
        { "inputPath", new ParameterSpecification { Type = ParameterType.String } },
        { "outputPath", new ParameterSpecification { Type = ParameterType.String } }
    },
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToBlob",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference()
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.inputPath" }
                    }
                }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference
                {
                    ReferenceName = blobDatasetName,
                    Parameters = new Dictionary<string, object>
                    {
                        { "path", "@pipeline().parameters.outputPath" }
                    }
                }
            },
            Source = new BlobSource { },
            Sink = new BlobSink { }
        }
    }
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));

创建管道运行

Main 方法中添加用于 触发管道运行 的以下代码。

此代码还设置 inputPath 和 outputPath 参数的值,这些值是使用源和接收器 Blob 路径的实际值在管道中指定的值 。

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
    { "inputPath", inputBlobPath },
    { "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

监视管道运行

  1. Main 方法中添加以下代码用于持续检查状态,直到它完成数据复制为止。

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. 在 Main 方法中添加以下代码用于检索复制活动运行详细信息,例如,读取/写入的数据大小。

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

运行代码

生成并启动应用程序,然后验证管道执行。

控制台会输出数据工厂、链接服务、数据集、管道和管道运行的创建进度。 然后,检查管道运行状态。 请等待,直至看到包含数据读取/写入大小的复制活动运行详细信息。 然后,使用 Azure 存储资源管理器等工具检查 Blob 是否已根据变量中的指定从“inputBlobPath”复制到“outputBlobPath”。

示例输出

Creating data factory SPv2Factory0907...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "China East 2"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>;EndpointSuffix=core.chinacloudapi.cn",
        "type": "SecureString"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@{dataset().path}",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "AzureStorageLinkedService",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "path": {
        "type": "String"
      }
    }
  }
}
Creating pipeline Adfv2QuickStartPipeline...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "inputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.inputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "BlobDataset",
            "parameters": {
              "path": "@pipeline().parameters.outputPath"
            },
            "type": "DatasetReference"
          }
        ],
        "name": "CopyFromBlobToBlob"
      }
    ],
    "parameters": {
      "inputPath": {
        "type": "String"
      },
      "outputPath": {
        "type": "String"
      }
    }
  }
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
    "dataRead": 331452208,
    "dataWritten": 331452208,
    "copyDuration": 23,
    "throughput": 14073.209,
    "errors": [],
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (China East 2)",
    "usedDataIntegrationUnits": 2,
    "billedDuration": 23
}

Press any key to exit...

验证输出

该管道自动在 adftutorial Blob 容器中创建 output 文件夹。 然后,将 emp.txt 文件从 input 文件夹复制到 output 文件夹。

  1. 在 Azure 门户中,在你在以上为 Blob 容器添加输入文件夹和文件部分中停留的 adftutorial 容器页上,选择“刷新”以查看输出文件夹 。
  2. 在文件夹列表中,选择“output”。
  3. 确认 emp.txt 已复制到 output 文件夹。

清理资源

若要以编程方式删除数据工厂,请向程序中添加以下代码行:

Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);

后续步骤

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。 完成相关教程来了解如何在更多方案中使用数据工厂。