使用 HDInsight .NET SDK 运行 MapReduce 作业Run MapReduce jobs using HDInsight .NET SDK

了解如何使用 HDInsight .NET SDK 提交 MapReduce 作业。Learn how to submit MapReduce jobs using HDInsight .NET SDK. HDInsight 群集附带了一个 jar 文件,其中包含一些 MarReduce 示例。HDInsight clusters come with a jar file with some MapReduce samples. 该 jar 文件是 /example/jars/hadoop-mapreduce-examples.jarThe jar file is /example/jars/hadoop-mapreduce-examples.jar. 其中一个示例是 wordcountOne of the samples is wordcount. 开发 C# 控制台应用程序,提交 wordcount 作业。You develop a C# console application to submit a wordcount job. 作业会读取 /example/data/gutenberg/davinci.txt 文件,并将结果输出到 /example/data/davinciwordcount 。The job reads the /example/data/gutenberg/davinci.txt file, and outputs the results to /example/data/davinciwordcount. 如果要重新运行该应用程序,必须清理输出文件夹。If you want to rerun the application, you must clean up the output folder.

备注

必须从 Windows 客户端执行本文中的步骤。The steps in this article must be performed from a Windows client. 有关使用 Linux、OS X 或 Unix 客户端处理 Hive 的信息,请使用本文顶部显示的选项卡选择器。For information on using a Linux, OS X, or Unix client to work with Hive, use the tab selector shown on the top of the article.

必备条件Prerequisites

使用 HDInsight .NET SDK 提交 MapReduce 作业Submit MapReduce jobs using HDInsight .NET SDK

HDInsight .NET SDK 提供 .NET 客户端库,可简化从 .NET 中使用 HDInsight 群集的操作。The HDInsight .NET SDK provides .NET client libraries, which makes it easier to work with HDInsight clusters from .NET.

  1. 启动 Visual Studio 并创建 C# 控制台应用程序。Start Visual Studio and create a C# console application.

  2. 导航到“工具” > “NuGet 包管理器” > “包管理器控制台”,然后输入以下命令 :Navigate to Tools > NuGet Package Manager > Package Manager Console and enter the following command:

    Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. 将以下代码复制到 Program.cs 中。Copy the code below into Program.cs. 然后通过设置 existingClusterNameexistingClusterPassworddefaultStorageAccountNamedefaultStorageAccountKeydefaultStorageContainerName 的值来编辑代码。Then edit the code by setting the values for: existingClusterName, existingClusterPassword, defaultStorageAccountName, defaultStorageAccountKey, and defaultStorageContainerName.

        using System.Collections.Generic;
        using System.IO;
        using System.Text;
        using System.Threading;
        using Microsoft.Azure.Management.HDInsight.Job;
        using Microsoft.Azure.Management.HDInsight.Job.Models;
        using Hyak.Common;
        using Microsoft.WindowsAzure.Storage;
        using Microsoft.WindowsAzure.Storage.Blob;
    
        namespace SubmitHDInsightJobDotNet
        {
            class Program
            {
                private static HDInsightJobManagementClient _hdiJobManagementClient;
    
                private const string existingClusterName = "<Your HDInsight Cluster Name>";
                private const string existingClusterUri = existingClusterName + ".azurehdinsight.cn";
                private const string existingClusterUsername = "<Cluster Username>";
                private const string existingClusterPassword = "<Cluster User Password>";
    
                private const string defaultStorageAccountName = "<Default Storage Account Name>"; //<StorageAccountName>.blob.core.chinacloudapi.cn
                private const string StorageAccountSuffix = "core.chinacloudapi.cn";
                private const string defaultStorageAccountKey = "<Default Storage Account Key>";
                private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
                private const string sourceFile = "/example/data/gutenberg/davinci.txt";  
                private const string outputFolder = "/example/data/davinciwordcount";
    
                static void Main(string[] args)
                {
                    System.Console.WriteLine("The application is running ...");
    
                    var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                    _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                    SubmitMRJob();
    
                    System.Console.WriteLine("Press ENTER to continue ...");
                    System.Console.ReadLine();
                }
    
                private static void SubmitMRJob()
                {
                    List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                    var paras = new MapReduceJobSubmissionParameters
                    {
                        JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                        JarClass = "wordcount",
                        Arguments = args
                    };
    
                    System.Console.WriteLine("Submitting the MR job to the cluster...");
                    var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                    var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                    System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                    System.Console.WriteLine("JobId is " + jobId);
    
                    System.Console.WriteLine("Waiting for the job completion ...");
    
                    // Wait for job completion
                    var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                    while (!jobDetail.Status.JobComplete)
                    {
                        Thread.Sleep(1000);
                        jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                    }
    
                    // Get job output
                    System.Console.WriteLine("Job output is: ");
                    var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                        defaultStorageContainerName, StorageAccountSuffix);
    
                    if (jobDetail.ExitValue == 0)
                    {
                        // Create the storage account object
                        CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + 
                            defaultStorageAccountName + 
                            ";AccountKey=" + defaultStorageAccountKey + "; Endsuffix=" + StorageAccountSuffix);
    
                        // Create the blob client.
                        CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                        // Retrieve reference to a previously created container.
                        CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                        CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                        using (var stream = blockBlob.OpenRead())
                        {
                            using (StreamReader reader = new StreamReader(stream))
                            {
                                while (!reader.EndOfStream)
                                {
                                    System.Console.WriteLine(reader.ReadLine());
                                }
                            }
                        }
                    }
                    else
                    {
                        // fetch stderr output in case of failure
                        var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); 
    
                        using (var reader = new StreamReader(output, Encoding.UTF8))
                        {
                            string value = reader.ReadToEnd();
                            System.Console.WriteLine(value);
                        }
    
                    }
                }
            }
        }
    
  4. F5 运行应用程序。Press F5 to run the application.

若要重新运行该作业,必须更改作业输出文件夹名称,在此示例中,它是“/example/data/davinciwordcount”。To run the job again, you must change the job output folder name, in the sample, it is "/example/data/davinciwordcount".

作业成功完成后,应用程序将打印输出文件的内容“part-r-00000”。When the job completes successfully, the application prints the content of the output file "part-r-00000".

后续步骤Next steps

在本文中,已经学习了几种创建 HDInsight 群集的方法。In this article, you have learned several ways to create an HDInsight cluster. 要了解更多信息,请参阅下列文章:To learn more, see the following articles: