2014年7月27日 星期日

Add and read cache file in MapReduce program (2.2.0 API)

In MapReduce programming,
if u need ur mappers or reducers to read the content of a file,
then it is necessary to
1. put that file to HDFS
2. add that file to be localized using mapreduce Job's api so that mappers or reducers can access it

Following is the java codes for doing so:

In main function

Job job = Job.getInstance(conf, "jobname");
job.addCacheFile(new Path("hdfs://ip:port/folder/filename").toUri());

In the setup of the mapper (or reducer)

URI[] uris=context.getCacheFiles();        
Path path = new Path(uris[0].toString());
Now one may read the content of the file.
In addition, if in the main function one use job.addCacheFile to add multiple files, say

job.addCacheFile(new Path("hdfs://ip:port/folder/file1").toUri());
job.addCacheFile(new Path("hdfs://ip:port/folder/file2").toUri());
job.addCacheFile(new Path("hdfs://ip:port/folder/file3").toUri());
then file1's URI will be uris[0], file2's URI will be uris[1], and file3's URI will be uris[2].

2014年7月23日 星期三

MapReduce - wordcount 程式說明 (hadoop 2.2.0 api)



public class WordCountNewAPI {

public static void main(String[] args) throws Exception{
  
      Configuration conf = new Configuration();
       // 指定Namenode的位置
      conf.set("fs.defaultFS", "hdfs://yourhostIP:port");

      System.setProperty("HADOOP_USER_NAME", hadoopUser);
    
      Job job = Job.getInstance(conf,"Job name");

      job.setJarByClass(WordCountNewAPI.class);
   
      // 指定HDFS上的輸入與輸出目錄
      Path input = new Path("/input");
      Path output = new Path("/output");
      // 可以有多個input path, 可以指定資料夾, 也可以指定到資料夾+檔案名稱
      FileInputFormat.addInputPath(job, input);
      FileOutputFormat.setOutputPath(job, output);
   
      job.setMapperClass(Map.class);
     /* Combiner可用也可以不用, 不影響結果, 使用它是為了增加工作效率;在這裡, Combiner使用上面的Reduce類別, 它會將每份文件各自先做Reduce後, 再送交給Reducer做overall的Reduce */
      job.setCombinerClass(Reduce.class);
      job.setReducerClass(Reduce.class);

    /*Hadoop定義了多種方法將不同型態的輸入資料轉化為map能夠處理的 pair, 這裡我們使用TextInputFormat(這也是Hadoop預設的輸入方法), 它輸出的 pair型別為 */
    /* LongWritable, Text是Hadoop特定的資料型別, 可以分別想成是java的long, string */
    /* TextInputFormat以換行作為分割點, it takes the offset to one line as one output key, and takes the text content of that line as the output value corresponding to that key */
     job.setInputFormatClass(TextInputFormat.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(IntWritable.class);

    // Run job
     boolean succ = job.waitForCompletion(true);
     if (! succ) {
         System.out.println("Job failed, exiting");
         System.exit(-1);
     }
     else{
         System.out.println("Job successfully done!");
     }
 }

 public static class Map extends Mapper {
   //IntWritable是Hadoop特定的資料型別, 可以分別想成是java的int
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
     //摘取到的字出現一次, 這裡將它紀錄下來
         context.write(word, one);
       }
     }
   }

 public static class Reduce extends Reducer {
     public void reduce(Text key, Iterable values, Context context)
       throws IOException, InterruptedException {
       int sum = 0;
   //對每一個字, 算出它出現的次數
       for(IntWritable value : values) {
         sum += value.get();
       }
       context.write(key, new IntWritable(sum));
     }
   }
}

2014年7月15日 星期二

MapReduce call native library: A MEMO

Reference:
http://hadoop.apache.org/docs/r2.2.0/hadoop-project-dist/hadoop-common/NativeLibraries.html

My revision:
This example shows you how to distribute a shared library, mylib.so, and load it from a MapReduce task.

1. First copy the library to the HDFS, say path: hdfs://ip:port/libraries/mylib.so

2. The job launching program should contain the following: 
Job job = new Job(conf)
job.addFileToClassPath(new Path(" hdfs://ip:port/libraries/mylib.so"));

3. The mapper and the reducer can contain: System.loadLibrary("mylib.so");