2014年7月23日 星期三

MapReduce - wordcount 程式說明 (hadoop 2.2.0 api)



public class WordCountNewAPI {

public static void main(String[] args) throws Exception{
  
      Configuration conf = new Configuration();
       // 指定Namenode的位置
      conf.set("fs.defaultFS", "hdfs://yourhostIP:port");

      System.setProperty("HADOOP_USER_NAME", hadoopUser);
    
      Job job = Job.getInstance(conf,"Job name");

      job.setJarByClass(WordCountNewAPI.class);
   
      // 指定HDFS上的輸入與輸出目錄
      Path input = new Path("/input");
      Path output = new Path("/output");
      // 可以有多個input path, 可以指定資料夾, 也可以指定到資料夾+檔案名稱
      FileInputFormat.addInputPath(job, input);
      FileOutputFormat.setOutputPath(job, output);
   
      job.setMapperClass(Map.class);
     /* Combiner可用也可以不用, 不影響結果, 使用它是為了增加工作效率;在這裡, Combiner使用上面的Reduce類別, 它會將每份文件各自先做Reduce後, 再送交給Reducer做overall的Reduce */
      job.setCombinerClass(Reduce.class);
      job.setReducerClass(Reduce.class);

    /*Hadoop定義了多種方法將不同型態的輸入資料轉化為map能夠處理的 pair, 這裡我們使用TextInputFormat(這也是Hadoop預設的輸入方法), 它輸出的 pair型別為 */
    /* LongWritable, Text是Hadoop特定的資料型別, 可以分別想成是java的long, string */
    /* TextInputFormat以換行作為分割點, it takes the offset to one line as one output key, and takes the text content of that line as the output value corresponding to that key */
     job.setInputFormatClass(TextInputFormat.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(IntWritable.class);

    // Run job
     boolean succ = job.waitForCompletion(true);
     if (! succ) {
         System.out.println("Job failed, exiting");
         System.exit(-1);
     }
     else{
         System.out.println("Job successfully done!");
     }
 }

 public static class Map extends Mapper {
   //IntWritable是Hadoop特定的資料型別, 可以分別想成是java的int
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
     //摘取到的字出現一次, 這裡將它紀錄下來
         context.write(word, one);
       }
     }
   }

 public static class Reduce extends Reducer {
     public void reduce(Text key, Iterable values, Context context)
       throws IOException, InterruptedException {
       int sum = 0;
   //對每一個字, 算出它出現的次數
       for(IntWritable value : values) {
         sum += value.get();
       }
       context.write(key, new IntWritable(sum));
     }
   }
}

沒有留言:

張貼留言