使用 HDInsight .NET SDK 运行 MapReduce 作业
了解如何使用 HDInsight .NET SDK 提交 MapReduce 作业。 HDInsight 群集附带含有一些 MapReduce 示例的 jar 文件。 该 jar 文件为 /example/jars/hadoop-mapreduce-examples.jar
。 其中一个示例是 wordcount。 开发 C# 控制台应用程序,提交 wordcount 作业。 该作业读取 /example/data/gutenberg/davinci.txt
文件,并将结果输出到 /example/data/davinciwordcount
。 如果想要重新运行该应用程序,必须清理输出文件夹。
注意
必须从 Windows 客户端执行本文中的步骤。 有关使用 Linux、OS X 或 Unix 客户端来使用 Hive 的信息,请使用本文顶部显示的选项卡选择器。
先决条件
HDInsight 中的 Apache Hadoop 群集。 请参阅使用 Azure 门户创建 Apache Hadoop 群集。
使用 HDInsight .NET SDK 提交 MapReduce 作业
HDInsight .NET SDK 提供 .NET 客户端库,可简化从 .NET 中使用 HDInsight 群集的操作。
启动 Visual Studio 并创建 C# 控制台应用程序。
导航到“工具”>“NuGet 包管理器”>“包管理器控制台”,然后输入以下命令 :
Install-Package Microsoft.Azure.Management.HDInsight.Job
将下面的代码复制到 Program.cs 中。 然后,通过设置以下项的值来编辑代码:
existingClusterName
、existingClusterPassword
、defaultStorageAccountName
、defaultStorageAccountKey
、defaultStorageContainerName
。using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using Microsoft.Azure.Management.HDInsight.Job; using Microsoft.Azure.Management.HDInsight.Job.Models; using Hyak.Common; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; namespace SubmitHDInsightJobDotNet { class Program { private static HDInsightJobManagementClient _hdiJobManagementClient; private const string existingClusterName = "<Your HDInsight Cluster Name>"; private const string existingClusterPassword = "<Cluster User Password>"; private const string defaultStorageAccountName = "<Default Storage Account Name>"; private const string defaultStorageAccountKey = "<Default Storage Account Key>"; private const string defaultStorageContainerName = "<Default Blob Container Name>"; private const string existingClusterUsername = "admin"; private const string existingClusterUri = existingClusterName + ".azurehdinsight.cn"; private const string sourceFile = "/example/data/gutenberg/davinci.txt"; private const string outputFolder = "/example/data/davinciwordcount"; static void Main(string[] args) { System.Console.WriteLine("The application is running ..."); var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword }; _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials); SubmitMRJob(); System.Console.WriteLine("Press ENTER to continue ..."); System.Console.ReadLine(); } private static void SubmitMRJob() { List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } }; var paras = new MapReduceJobSubmissionParameters { JarFile = @"/example/jars/hadoop-mapreduce-examples.jar", JarClass = "wordcount", Arguments = args }; System.Console.WriteLine("Submitting the MR job to the cluster..."); var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras); var jobId = jobResponse.JobSubmissionJsonResponse.Id; System.Console.WriteLine("Response status code is " + jobResponse.StatusCode); System.Console.WriteLine("JobId is " + jobId); System.Console.WriteLine("Waiting for the job completion ..."); // Wait for job completion var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; while (!jobDetail.Status.JobComplete) { Thread.Sleep(1000); jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; } // Get job output System.Console.WriteLine("Job output is: "); var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey, defaultStorageContainerName); if (jobDetail.ExitValue == 0) { // Create the storage account object CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + defaultStorageAccountName + ";AccountKey=" + defaultStorageAccountKey); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000"); using (var stream = blockBlob.OpenRead()) { using (StreamReader reader = new StreamReader(stream)) { while (!reader.EndOfStream) { System.Console.WriteLine(reader.ReadLine()); } } } } else { // fetch stderr output in case of failure var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); using (var reader = new StreamReader(output, Encoding.UTF8)) { string value = reader.ReadToEnd(); System.Console.WriteLine(value); } } } } }
按 F5 运行该应用程序。
若要重新运行该作业,必须更改作业输出文件夹的名称(在此示例中为 /example/data/davinciwordcount
)。
作业成功完成后,应用程序会输出输出文件 part-r-00000
的内容。
后续步骤
在本文中,你已了解几种创建 HDInsight 群集的方法。 要了解更多信息,请参阅下列文章:
- 有关如何提交 Hive 作业,请参阅使用 HDInsight .NET SDK 运行 Apache Hive 查询。
- 有关如何创建 HDInsight 群集,请参阅在 HDInsight 中创建基于 Linux 的 Apache Hadoop 群集。
- 有关如何管理 HDInsight 群集,请参阅在 HDInsight 中管理 Apache Hadoop 群集。
- 请参阅 HDInsight.NET SDK 参考资料,学习 HDInsight.NET SDK。
- 请参阅创建非交互式身份验证 .NET HDInsight 应用程序,了解对 Azure 的非交互式身份验证。