MapReduce實作

前面介紹完了HDFS的基本操作與指令介紹後,接下來要來簡介MapReduce分散式運算的實作,終於可以寫點程式啦!

由於Hadoop原始碼是由Java所撰寫,當然MapReduce也需要使用Java來實作囉。本篇將會實作MapReduce界中的Hello World,word count。

WordCount的範例程式

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HadoopWordCountSample {
    public static class WordCountMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        //計數使用,設定為1。每當找到相同的字就會+1。
        private final static IntWritable plugOne  = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            //使用StringTokenizer效能會比使用split好。預設使用空白、tab或是換行當作分隔符號。
            StringTokenizer st = new StringTokenizer(value.toString());
            while (st.hasMoreTokens()) {
                word.set(st.nextToken());
                context.write(word, plugOne);
            }
        }
    }

    public static class WordCountReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {

        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int reduceSum = 0;
            for (IntWritable val : values) {
                reduceSum += val.get();
            }
            result.set(reduceSum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        Job job = Job.getInstance(config, "hadoop word count example");
        job.setJarByClass(HadoopWordCountSample.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapperClass(WordCountMapper.class);
        //設定setCombinerClass後,每個mapper會在sorting後,對結果先做一次reduce
        job.setCombinerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //執行程式時,第一個參數(agrs[0])為欲計算檔案路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //第二個參數(agrs[1])為計算結果存放路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

使用說明

  • Step1. 複製程式碼:
    • 使用筆記本或是vivim等指令將上列程式碼複製貼上後存擋。假設檔案名稱為:HadoopWordCountSample.java
  • Step2. 設定環境。
    • 假設儲存程式碼的環境是安裝hadoop的其中一台node,已經設定完JAVA_HOMEHADOOP_HOME等環境變數,接下來只要設定HADOOP_CLASSPATH即可。
      #編輯~/.bashrc檔案
      sudo vi ~/.bashrc
      #加入HADOOP_CLASSPATH。注意!記得放在JAVA_HOME後面得行數,否則會出錯。
      export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
      
  • Step3. 編譯並打包程式碼。
    #Compile
    hadoop com.sun.tools.javac.Main HadoopWordCountSample.java
    #Package
    jar cf hwcs.jar HadoopWordCountSample*.class
    
  • Step4. 準備要計算的字數的檔案,並上傳至HDFS。
    寫好了word count程式碼,那也要有計算目標。為了讓MapReduce發揮淋漓盡致,需要將要計算的檔案上傳至HDFS,有了分散式儲存再加上分散式運算,真是天作之合啊!(威!離題了!)
    • 首先使用先在本機機器產生兩個檔案:wordcount_target1wordcount_target2,內容分別為:
      • wordcount_target1
        I am Jack
        I am the king of the world
        
      • wordcount_target2
        I am Rose
        I am looking for Jack
        
    • 接下來在HDFS上建立一個資料夾,並且將這兩個檔案上傳到資料夾內:
      #建立資料夾
      hadoop fs -mkdir -p /tmp/wordcount_target
      #上傳
      hadoop fs -put wordcount_target1 wordcount_target2 /tmp/wordcount_target
      
  • Step5. 在Hadoop上執行程式碼
    hadoop jar hwcs.jar HadoopWordCountSample /tmp/wordcount_target /tmp/wordcount_result
    #說明
    hwcs.jar:              要執行計算的MapReduce jar檔案路徑(需要包含的檔案名稱)
    HadoopWordCountSample: 欲執行的class name
    /tmp/wordcount_target: 欲計算檔案路徑
    /tmp/wordcount_result: 計算結果存放路徑
    

    如果MapReduce job成功送出到Hadoop上執行後,Yarn會接手資源控管,可以透過web ui: http://{host_or_ip}:8088 觀察MapReduce運作的情形。
    註:{host_or_ip}請輸入安裝時namenode所在的host name或是ip。如果是Standalone安裝模式,請輸入localhost

  • Step6. 查看結果:
    hadoop fs -cat /tmp/wordcount_result/part-r-00000
    #結果
    am 4
    for 1
    king 1
    looking 1
    of 1
    the 2
    world 1
    I 4
    Jack 2
    Rose 1
    

成功執行程式碼後,想必有些人應該是滿臉問號,下一篇就來解釋程式碼的運作流程吧。