在 HDInsight 中的 Apache Hadoop 上将 C# 与 MapReduce 流式处理配合使用Use C# with MapReduce streaming on Apache Hadoop in HDInsight

了解如何在 HDInsight 上使用 C# 创建 MapReduce 解决方案。Learn how to use C# to create a MapReduce solution on HDInsight.

Apache Hadoop 流式处理是一个实用工具,通过它可以使用脚本或可执行文件运行 MapReduce 作业。Apache Hadoop streaming is a utility that allows you to run MapReduce jobs using a script or executable. 在本示例中,.NET 用于为单词计数解决方案实现映射器和化简器。In this example, .NET is used to implement the mapper and reducer for a word count solution.

HDInsight 上的 .NET.NET on HDInsight

HDInsight 群集使用 Mono (https://mono-project.com) 运行 .NET 应用程序。HDInsight clusters use Mono (https://mono-project.com) to run .NET applications. HDInsight 版本 3.6 附带了 Mono 版本 4.2.1。Mono version 4.2.1 is included with HDInsight version 3.6. 有关 HDInsight 随附的 Mono 版本的详细信息,请参阅不同 HDInsight 版本随附的 Apache Hadoop 组件For more information on the version of Mono included with HDInsight, see Apache Hadoop components available with different HDInsight versions.

有关 Mono 与 .NET Framework 版本的兼容性的详细信息,请参阅 Mono 兼容性For more information on Mono compatibility with .NET Framework versions, see Mono compatibility.

Hadoop 流式处理的工作原理How Hadoop streaming works

在本文档中用于流式处理的基本流程如下所示:The basic process used for streaming in this document is as follows:

  1. Hadoop 在 STDIN 上将数据传递到映射器(在本示例中为 mapper.exe)。Hadoop passes data to the mapper (mapper.exe in this example) on STDIN.
  2. 映射器处理数据,并向 STDOUT 发出制表符分隔的键/值对。The mapper processes the data, and emits tab-delimited key/value pairs to STDOUT.
  3. 该输出由 Hadoop 读取,随后会传递到 STDIN 上的化简器(在本示例中为 reducer.exe)。The output is read by Hadoop, and then passed to the reducer (reducer.exe in this example) on STDIN.
  4. 化简器将读取制表符分隔的键/值对、处理数据,并将结果作为制表符分隔的键/值对在 STDOUT 上发出。The reducer reads the tab-delimited key/value pairs, processes the data, and then emits the result as tab-delimited key/value pairs on STDOUT.
  5. 该输出由 Hadoop 读取,并写入输出目录。The output is read by Hadoop and written to the output directory.

有关流式处理的详细信息,请参阅 Hadoop 流式处理For more information on streaming, see Hadoop Streaming.

先决条件Prerequisites

  • Visual Studio。Visual Studio.

  • 熟悉编写和生成面向 .NET Framework 4.5 的 C# 代码。A familiarity with writing and building C# code that targets .NET Framework 4.5.

  • 将 .exe 文件上传到群集的方法。A way to upload .exe files to the cluster. 本文档中的各个步骤都使用针对 Visual Studio 的 Data Lake 工具将文件上传到群集的主要存储。The steps in this document use the Data Lake Tools for Visual Studio to upload the files to primary storage for the cluster.

  • 如果使用 PowerShell,需要安装 Az 模块If using PowerShell, you'll need the Az Module.

  • SSH 客户端(可选)。An SSH client (optional). 有关详细信息,请参阅使用 SSH 连接到 HDInsight (Apache Hadoop)For more information, see Connect to HDInsight (Apache Hadoop) using SSH.

  • HDInsight 中的 Apache Hadoop 群集。An Apache Hadoop cluster on HDInsight. 请参阅 Linux 上的 HDInsight 入门See Get Started with HDInsight on Linux.

  • 群集主存储的 URI 方案The URI scheme for your clusters primary storage. 对于 Azure 存储,此值为 wasb://;对于Azure Data Lake Storage Gen2,此值为 abfs://This would be wasb:// for Azure Storage, abfs:// for Azure Data Lake Storage Gen2. 如果为 Azure 存储或 Data Lake Storage Gen2 启用了安全传输,则 URI 将是 wasbs://abfss://。另请参阅安全传输If secure transfer is enabled for Azure Storage or Data Lake Storage Gen2, the URI would be wasbs:// or abfss://, respectively See also, secure transfer.

创建映射器Create the mapper

在 Visual Studio 中,创建名为 mapper 的新 .NET Framework 控制台应用程序。In Visual Studio, create a new .NET Framework console application named mapper. 针对该应用程序使用以下代码:Use the following code for the application:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

创建应用程序后,生成该应用程序,以便在项目目录中生成 /bin/Debug/mapper.exe 文件。After you create the application, build it to produce the /bin/Debug/mapper.exe file in the project directory.

创建化简器Create the reducer

在 Visual Studio 中,创建名为 reducer 的新 .NET Framework 控制台应用程序。In Visual Studio, create a new .NET Framework console application named reducer. 针对该应用程序使用以下代码:Use the following code for the application:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

创建应用程序后,生成该应用程序,以便在项目目录中生成 /bin/Debug/reducer.exe 文件。After you create the application, build it to produce the /bin/Debug/reducer.exe file in the project directory.

上传到存储Upload to storage

接下来,需要将 mapperreducer 应用程序上传到 HDInsight 存储。Next, you need to upload the mapper and reducer applications to HDInsight storage.

  1. 在 Visual Studio 中,选择“视图” > “服务器资源管理器”。 In Visual Studio, select View > Server Explorer.

  2. 右键单击“Azure”并选择“连接到 Microsoft Azure 订阅...”,然后完成登录过程。 Right-click Azure, select Connect to Microsoft Azure Subscription..., and complete the sign in process.

  3. 展开要将此应用程序部署到的 HDInsight 群集。Expand the HDInsight cluster that you wish to deploy this application to. 列出带有文本“(默认存储帐户)” 的条目。An entry with the text (Default Storage Account) is listed.

    存储帐户,HDInsight 群集,服务器资源管理器,Visual Studio

    • 如果可以展开“(默认存储帐户)”项,则表示你正在使用 Azure 存储帐户作为群集的默认存储。 If the (Default Storage Account) entry can be expanded, you're using an Azure Storage Account as default storage for the cluster. 若要查看群集的默认存储中的文件,请展开该项,然后双击“(默认容器)”。 To view the files on the default storage for the cluster, expand the entry and then double-click (Default Container).

    • 如果无法展开“(默认存储帐户)”项,则表示你正在使用 Azure Data Lake Storage 作为群集的默认存储。 If the (Default Storage Account) entry can't be expanded, you're using Azure Data Lake Storage as the default storage for the cluster. 若要查看该群集的默认存储上的文件,请双击“(默认存储帐户)” 条目。To view the files on the default storage for the cluster, double-click the (Default Storage Account) entry.

  4. 若要上传 .exe 文件,请使用以下方法之一:To upload the .exe files, use one of the following methods:

    • 如果使用的是 Azure 存储帐户,请选择“上传 Blob”图标。 If you're using an Azure Storage Account, select the Upload Blob icon.

      mapper 的 HDInsight 上传图标,Visual Studio

      在“上传新文件”对话框中的“文件名”下,选择“浏览”。 In the Upload New File dialog box, under File name, select Browse. 在“上传 Blob”对话框中,转到“mapper”项目的“bin\debug”文件夹,然后选择“mapper.exe”文件。 In the Upload Blob dialog box, go to the bin\debug folder for the mapper project, and then choose the mapper.exe file. 最后,依次选择“打开”、“确定”完成上传。 Finally, select Open and then OK to complete the upload.

    • 对于“Azure Data Lake Storage”,请右键单击文件列表中的空白区域,然后选择“上传”。 For Azure Data Lake Storage, right-click an empty area in the file listing, and then select Upload. 最后,依次选择“mapper.exe”文件、“打开”。 Finally, select the mapper.exe file and then select Open.

    上传“mapper.exe” 完成后,请为“reducer.exe” 文件重复该上传过程。Once the mapper.exe upload has finished, repeat the upload process for the reducer.exe file.

运行作业:使用 SSH 会话Run a job: Using an SSH session

以下过程说明如何使用 SSH 会话运行 MapReduce 作业:The following procedure describes how to run a MapReduce job using an SSH session:

  1. 使用 ssh 命令连接到群集。Use ssh command to connect to your cluster. 编辑以下命令(将 CLUSTERNAME 替换为群集的名称),然后输入该命令:Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 使用以下命令之一启动 MapReduce 作业:Use one of the following commands to start the MapReduce job:

    • 如果默认存储为 Azure 存储If the default storage is Azure Storage:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 如果默认存储为 Data Lake Storage Gen2If the default storage is Data Lake Storage Gen2:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    以下列表描述了每个参数和选项的含义:The following list describes what each parameter and option represents:

    • hadoop-streaming.jar:指定包含流式处理 MapReduce 功能的 jar 文件。hadoop-streaming.jar: Specifies the jar file that contains the streaming MapReduce functionality.
    • -files:为此作业指定 mapper.exereducer.exe 文件。-files: Specifies the mapper.exe and reducer.exe files for this job. 每个文件前的 wasbs:///adl:///abfs:/// 协议声明是群集默认存储的根目录的路径。The wasbs:///, adl:///, or abfs:/// protocol declaration before each file is the path to the root of default storage for the cluster.
    • -mapper:指定实现映射器的文件。-mapper: Specifies the file that implements the mapper.
    • -reducer:指定实现化简器的文件。-reducer: Specifies the file that implements the reducer.
    • -input:指定输入数据。-input: Specifies the input data.
    • -output:指定输出目录。-output: Specifies the output directory.
  3. 完成 MapReduce 作业后,使用以下命令查看结果:Once the MapReduce job completes, use the following command to view the results:

    hdfs dfs -text /example/wordcountout/part-00000
    

    以下文本是此命令返回的数据的示例:The following text is an example of the data returned by this command:

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

运行作业:使用 PowerShellRun a job: Using PowerShell

使用以下 PowerShell 脚本运行 MapReduce 作业,并下载结果。Use the following PowerShell script to run a MapReduce job and download the results.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount -EnvironmentName AzureChinaCloud
}

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzureRmHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzureRmHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
# Azure Storage account
# Get the container
$container=$clusterInfo.DefaultStorageContainer
#NOTE: This assumes that the storage account is in the same resource
#      group as HDInsight. If it is not, change the
#      --ResourceGroupName parameter to the group that contains storage.
$storageAccountKey=(Get-AzureRmStorageAccountKey `
    -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

#Create a storage context
$context = New-AzureStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey
# Download the file
Get-AzureStorageBlobContent `
    -Blob 'example/data/WordCountOutput/part-r-00000' `
    -Container $container `
    -Destination output.txt `
    -Context $context

此脚本会提示用户提供群集登录的帐户名和密码,以及 HDInsight 群集名称。This script prompts you for the cluster login account name and password, along with the HDInsight cluster name. 作业完成后,输出会下载到名为 output.txt 的文件中。Once the job completes, the output is downloaded to a file named output.txt. 以下文本是 output.txt 文件中数据的示例:The following text is an example of the data in the output.txt file:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

后续步骤Next steps

有关将 MapReduce 与 HDInsight 配合使用的详细信息,请参阅在 HDInsight 上的 Apache Hadoop 中使用 MapReduceFor more information on using MapReduce with HDInsight, see Use MapReduce in Apache Hadoop on HDInsight.

有关将 C# 与 Hive 和 Pig 配合使用的信息,请参阅将 C# 用户定义函数与 Apache Hive 和 Apache Pig 配合使用For information on using C# with Hive and Pig, see Use a C# user-defined function with Apache Hive and Apache Pig.

有关在 HDInsight 上将 C# 与 Storm 配合使用的信息,请参阅为 HDInsight 上的 Apache Storm 开发 C# 拓扑For information on using C# with Storm on HDInsight, see Develop C# topologies for Apache Storm on HDInsight.