Leer en inglés

Compartir a través de

在 HDInsight 中的 Apache Hadoop 上将 C# 与 MapReduce 流式处理配合使用

了解如何在 HDInsight 上使用 C# 创建 MapReduce 解决方案。

Apache Hadoop 流式处理使你能够使用脚本或可执行文件运行 MapReduce 作业。 此处,.NET 用于为单词计数解决方案实现映射器和化简器。

HDInsight 上的 .NET

HDInsight 群集使用 Mono 运行 .NET 应用程序。 HDInsight 版本 3.6 附带了 Mono 版本 4.2.1。 有关 HDInsight 附带的 Mono 版本的详细信息,请参阅 HDInsight 版本中提供的 Apache Hadoop 组件

有关 Mono 与 .NET Framework 版本的兼容性的详细信息,请参阅 Mono 兼容性

Hadoop 流式处理的工作原理

在本文档中用于流处理的基本流程如下所示:

  1. Hadoop 在 STDIN 上将数据传递到映射器(在本示例中为 mapper.exe)。
  2. 映射器处理数据,并向 STDOUT 发出制表符分隔的键/值对。
  3. 该输出由 Hadoop 读取,随后将传递到 STDIN 上的化简器(在本示例中为 reducer.exe)。
  4. 化简器将读取制表符分隔的键/值对、处理数据,并将结果作为制表符分隔的键/值对在 STDOUT 上发出。
  5. 该输出由 Hadoop 读取,并写入输出目录。

有关流式处理的详细信息,请参阅 Hadoop 流式处理

先决条件

  • Visual Studio。

  • 熟悉编写和生成面向 .NET Framework 4.5 的 C# 代码。

  • 将 .exe 文件上传到群集的方法。 本文档中的各个步骤都使用针对 Visual Studio 的 Data Lake 工具将文件上传到群集的主要存储。

  • 如果使用 PowerShell,则需要 Az 模块

  • HDInsight 中的 Apache Hadoop 群集。 请参阅 Linux 上的 HDInsight 入门

  • 群集主存储的 URI 方案。 对于 Azure 存储,此架构为 wasb://;对于Azure Data Lake Storage Gen2,此架构为 abfs://;对于 Azure Data Lake Storage Gen1,此架构为 adl://。 如果为 Azure 存储或 Data Lake Storage Gen2 启用了安全传输,则 URI 分别是 wasbs://abfss://

创建映射器

在 Visual Studio 中,创建新的名为 mapper 的 .NET Framework 控制台应用程序。 针对该应用程序使用以下代码:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

创建该应用程序后,构建它以在项目目录中生成 /bin/Debug/mapper.exe 文件。

创建化简器

在 Visual Studio 中,创建新的名为 化简器的 .NET Framework 控制台应用程序。 针对该应用程序使用以下代码:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

创建该应用程序后,构建它以在项目目录中生成 /bin/Debug/reducer.exe 文件。

上传到存储

接下来,需要将 映射器化简器 应用程序上传到 HDInsight 存储。

  1. 在 Visual Studio 中,选择 “查看>服务器资源管理器”。

  2. 右键单击 Azure,选择“ 连接到 Azure 订阅...”,并完成登录过程。

  3. 扩展您希望部署此应用程序的 HDInsight 群集。 列出了包含文本 (默认存储帐户)的 条目。

    存储帐户、HDInsight 群集、服务器资源管理器、Visual Studio。

    • 如果 可以扩展(默认存储帐户) 条目,则使用 Azure 存储帐户 作为群集的默认存储。 若要查看群集的默认存储中的文件,请展开该项,然后双击“(默认容器)”。

    • 如果 无法扩展(默认存储帐户) 条目,则使用 Azure Data Lake Storage 作为群集的默认存储。 若要查看该群集的默认存储上的文件,请双击“(默认存储帐户)”条目。

  4. 若要上传 .exe 文件,请使用以下方法之一:

    • 如果使用 Azure 存储帐户,请选择 “上传 Blob ”图标。

      用于 Visual Studio 映射器的 HDInsight 上传图标。

      在“上传新文件”对话框中的“文件名”下,选择“浏览”。 在“上传 Blob”对话框中,转到“mapper”项目的“bin\debug”文件夹,然后选择“mapper.exe”文件。 最后,依次选择“打开”、“确定”完成上传。

    • 对于“Azure Data Lake Storage”,请右键单击文件列表中的空白区域,然后选择“上传”。 最后,先选择“mapper.exe”文件,然后选择“打开”。

    上传“mapper.exe”完成后,请为“reducer.exe”文件重复该上传过程。

运行作业:使用 SSH 会话

以下过程说明如何使用 SSH 会话运行 MapReduce 作业:

  1. 使用 ssh 命令 连接到群集。 编辑以下命令,将 CLUSTERNAME 替换为群集的名称,然后输入该命令:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 使用以下命令之一启动 MapReduce 作业:

    • 如果默认存储为 Azure 存储

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 如果默认存储为 Data Lake Storage Gen2

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    以下列表描述了每个参数和选项的含义:

    参数 DESCRIPTION
    hadoop-streaming.jar 指定包含流式处理 MapReduce 功能的 jar 文件。
    -files 为此作业指定 mapper.exe 和 reducer.exe 文件。 每个文件前的 wasbs:///adl:///abfs:/// 协议声明是群集默认存储的根目录的路径。
    -mapper 指定实现映射器的文件。
    -reducer 指定实现化简器的文件。
    -input 指定输入数据。
    -输出 指定输出目录。
  3. 完成 MapReduce 作业后,使用以下命令查看结果:

    hdfs dfs -text /example/wordcountout/part-00000
    

    以下文本是此命令返回的数据的示例:

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

运行作业:使用 PowerShell

使用以下 PowerShell 脚本运行 MapReduce 作业,并下载结果。

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount -Environment AzureChinaCloud
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
# of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    # group as HDInsight. If it is not, change the
    # --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

此脚本会提示用户提供群集登录的帐户名和密码,以及 HDInsight 群集名称。 作业完成后,输出会下载到名为 output.txt 的文件中。 以下文本是 output.txt 文件中数据的示例:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

后续步骤