使用 HDInsight .NET SDK 运行 MapReduce 作业

了解如何使用 HDInsight .NET SDK 提交 MapReduce 作业。 HDInsight 群集附带了一个 jar 文件,其中包含一些 MarReduce 示例。 该 jar 文件是 /example/jars/hadoop-mapreduce-examples.jar。 其中一个示例是 wordcount。 开发 C# 控制台应用程序,提交 wordcount 作业。 作业会读取 /example/data/gutenberg/davinci.txt 文件,并将结果输出到 /example/data/davinciwordcount。 如果要重新运行该应用程序,必须清理输出文件夹。

Note

必须从 Windows 客户端执行本文中的步骤。 有关使用 Linux、OS X 或 Unix 客户端处理 Hive 的信息,请使用本文顶部显示的选项卡选择器。

先决条件

在开始阅读本文前,必须具有以下项:

使用 HDInsight .NET SDK 提交 MapReduce 作业

HDInsight .NET SDK 提供 .NET 客户端库,可简化从 .NET 中使用 HDInsight 群集的操作。

提交作业

  1. 在 Visual Studio 中创建 C# 控制台应用程序。
  2. 通过 Nuget 包管理器控制台运行以下命令:

     Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. 使用以下代码:

     using System.Collections.Generic;
     using System.IO;
     using System.Text;
     using System.Threading;
     using Microsoft.Azure.Management.HDInsight.Job;
     using Microsoft.Azure.Management.HDInsight.Job.Models;
     using Hyak.Common;
     using Microsoft.WindowsAzure.Storage;
     using Microsoft.WindowsAzure.Storage.Blob;
    
     namespace SubmitHDInsightJobDotNet
     {
         class Program
         {
             private static HDInsightJobManagementClient _hdiJobManagementClient;
    
             private const string existingClusterName = "<Your HDInsight Cluster Name>";
             private const string existingClusterUri = existingClusterName + ".azurehdinsight.cn";
             private const string existingClusterUsername = "<Cluster Username>";
             private const string existingClusterPassword = "<Cluster User Password>";
    
             private const string defaultStorageAccountName = "<Default Storage Account Name>"; //<StorageAccountName>.blob.core.chinacloudapi.cn
             private const string StorageAccountSuffix = "core.chinacloudapi.cn";
             private const string defaultStorageAccountKey = "<Default Storage Account Key>";
             private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
             private const string sourceFile = "/example/data/gutenberg/davinci.txt";  
             private const string outputFolder = "/example/data/davinciwordcount";
    
             static void Main(string[] args)
             {
                 System.Console.WriteLine("The application is running ...");
    
                 var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = ExistingClusterUsername, Password = ExistingClusterPassword };
                 _hdiJobManagementClient = new HDInsightJobManagementClient(ExistingClusterUri, clusterCredentials);
    
                 SubmitMRJob();
    
                 System.Console.WriteLine("Press ENTER to continue ...");
                 System.Console.ReadLine();
             }
    
             private static void SubmitMRJob()
             {
                 List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                 var paras = new MapReduceJobSubmissionParameters
                 {
                     JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                     JarClass = "wordcount",
                     Arguments = args
                 };
    
                 System.Console.WriteLine("Submitting the MR job to the cluster...");
                 var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                 var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                 System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                 System.Console.WriteLine("JobId is " + jobId);
    
                 System.Console.WriteLine("Waiting for the job completion ...");
    
                 // Wait for job completion
                 var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 while (!jobDetail.Status.JobComplete)
                 {
                     Thread.Sleep(1000);
                     jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 }
    
                 // Get job output
                 System.Console.WriteLine("Job output is: ");
                 var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                     defaultStorageContainerName, StorageAccountSuffix);
    
                 if (jobDetail.ExitValue == 0)
                 {
                     // Create the storage account object
                     CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + 
                         defaultStorageAccountName + 
                         ";AccountKey=" + defaultStorageAccountKey + "; Endsuffix=" + StorageAccountSuffix);
    
                     // Create the blob client.
                     CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                     // Retrieve reference to a previously created container.
                     CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                     CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                     using (var stream = blockBlob.OpenRead())
                     {
                         using (StreamReader reader = new StreamReader(stream))
                         {
                             while (!reader.EndOfStream)
                             {
                                 System.Console.WriteLine(reader.ReadLine());
                             }
                         }
                     }
                 }
                 else
                 {
                     // fetch stderr output in case of failure
                     var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); 
    
                     using (var reader = new StreamReader(output, Encoding.UTF8))
                     {
                         string value = reader.ReadToEnd();
                         System.Console.WriteLine(value);
                     }
    
                 }
             }
         }
     }
    
  4. F5 运行应用程序。

若要重新运行该作业,必须更改作业输出文件夹名称,在此示例中,它是“/example/data/davinciwordcount”。

作业成功完成后,应用程序将打印输出文件的内容“part-r-00000”。

后续步骤

在本文中,已经学习了几种创建 HDInsight 群集的方法。 若要了解更多信息,请参阅下列文章: