了解如何使用 HDInsight .NET SDK 提交 MapReduce 作业。 HDInsight 群集附带含有一些 MapReduce 示例的 jar 文件。 该 jar 文件为 /example/jars/hadoop-mapreduce-examples.jar
。 其中一个示例是 wordcount。 开发 C# 控制台应用程序,提交 wordcount 作业。 该作业读取 /example/data/gutenberg/davinci.txt
文件,并将结果输出到 /example/data/davinciwordcount
。 如果想要重新运行该应用程序,必须清理输出文件夹。
备注
必须从 Windows 客户端执行本文中的步骤。 有关使用 Linux、OS X 或 Unix 客户端来使用 Hive 的信息,请使用本文顶部显示的选项卡选择器。
HDInsight 中的 Apache Hadoop 群集。 请参阅使用 Azure 门户创建 Apache Hadoop 群集。
HDInsight .NET SDK 提供 .NET 客户端库,可简化从 .NET 中使用 HDInsight 群集的操作。
启动 Visual Studio 并创建 C# 控制台应用程序。
导航到“工具”>“NuGet 包管理器”>“包管理器控制台”,然后输入以下命令 :
Install-Package Microsoft.Azure.Management.HDInsight.Job
将下面的代码复制到 Program.cs 中。 然后,通过设置以下项的值来编辑代码:
existingClusterName
、existingClusterPassword
、defaultStorageAccountName
、defaultStorageAccountKey
、defaultStorageContainerName
。using System.Collections.Generic; using System.IO; using System.Text; using System.Threading; using Microsoft.Azure.Management.HDInsight.Job; using Microsoft.Azure.Management.HDInsight.Job.Models; using Hyak.Common; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; namespace SubmitHDInsightJobDotNet { class Program { private static HDInsightJobManagementClient _hdiJobManagementClient; private const string existingClusterName = "<Your HDInsight Cluster Name>"; private const string existingClusterPassword = "<Cluster User Password>"; private const string defaultStorageAccountName = "<Default Storage Account Name>"; private const string defaultStorageAccountKey = "<Default Storage Account Key>"; private const string defaultStorageContainerName = "<Default Blob Container Name>"; private const string existingClusterUsername = "admin"; private const string existingClusterUri = existingClusterName + ".azurehdinsight.cn"; private const string sourceFile = "/example/data/gutenberg/davinci.txt"; private const string outputFolder = "/example/data/davinciwordcount"; static void Main(string[] args) { System.Console.WriteLine("The application is running ..."); var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword }; _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials); SubmitMRJob(); System.Console.WriteLine("Press ENTER to continue ..."); System.Console.ReadLine(); } private static void SubmitMRJob() { List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } }; var paras = new MapReduceJobSubmissionParameters { JarFile = @"/example/jars/hadoop-mapreduce-examples.jar", JarClass = "wordcount", Arguments = args }; System.Console.WriteLine("Submitting the MR job to the cluster..."); var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras); var jobId = jobResponse.JobSubmissionJsonResponse.Id; System.Console.WriteLine("Response status code is " + jobResponse.StatusCode); System.Console.WriteLine("JobId is " + jobId); System.Console.WriteLine("Waiting for the job completion ..."); // Wait for job completion var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; while (!jobDetail.Status.JobComplete) { Thread.Sleep(1000); jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; } // Get job output System.Console.WriteLine("Job output is: "); var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey, defaultStorageContainerName); if (jobDetail.ExitValue == 0) { // Create the storage account object CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + defaultStorageAccountName + ";AccountKey=" + defaultStorageAccountKey); // Create the blob client. CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); // Retrieve reference to a previously created container. CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000"); using (var stream = blockBlob.OpenRead()) { using (StreamReader reader = new StreamReader(stream)) { while (!reader.EndOfStream) { System.Console.WriteLine(reader.ReadLine()); } } } } else { // fetch stderr output in case of failure var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); using (var reader = new StreamReader(output, Encoding.UTF8)) { string value = reader.ReadToEnd(); System.Console.WriteLine(value); } } } } }
按 F5 运行该应用程序。
若要重新运行该作业,必须更改作业输出文件夹的名称(在此示例中为 /example/data/davinciwordcount
)。
作业成功完成后,应用程序会输出输出文件 part-r-00000
的内容。
在本文中,你已了解几种创建 HDInsight 群集的方法。 要了解更多信息,请参阅下列文章:
- 有关如何提交 Hive 作业,请参阅使用 HDInsight .NET SDK 运行 Apache Hive 查询。
- 有关如何创建 HDInsight 群集,请参阅在 HDInsight 中创建基于 Linux 的 Apache Hadoop 群集。
- 有关如何管理 HDInsight 群集,请参阅在 HDInsight 中管理 Apache Hadoop 群集。
- 请参阅 HDInsight.NET SDK 参考资料,学习 HDInsight.NET SDK。
- 请参阅创建非交互式身份验证 .NET HDInsight 应用程序,了解对 Azure 的非交互式身份验证。