public class WordCountNewAPI { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); // 指定Namenode的位置 conf.set("fs.defaultFS", "hdfs://yourhostIP:port"); System.setProperty("HADOOP_USER_NAME", hadoopUser); Job job = Job.getInstance(conf,"Job name"); job.setJarByClass(WordCountNewAPI.class); // 指定HDFS上的輸入與輸出目錄 Path input = new Path("/input"); Path output = new Path("/output"); // 可以有多個input path, 可以指定資料夾, 也可以指定到資料夾+檔案名稱 FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setMapperClass(Map.class); /* Combiner可用也可以不用, 不影響結果, 使用它是為了增加工作效率;在這裡, Combiner使用上面的Reduce類別, 它會將每份文件各自先做Reduce後, 再送交給Reducer做overall的Reduce */ job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); /*Hadoop定義了多種方法將不同型態的輸入資料轉化為map能夠處理的pair, 這裡我們使用TextInputFormat(這也是Hadoop預設的輸入方法), 它輸出的 pair型別為 */ /* LongWritable, Text是Hadoop特定的資料型別, 可以分別想成是java的long, string */ /* TextInputFormat以換行作為分割點, it takes the offset to one line as one output key, and takes the text content of that line as the output value corresponding to that key */ job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // Run job boolean succ = job.waitForCompletion(true); if (! succ) { System.out.println("Job failed, exiting"); System.exit(-1); } else{ System.out.println("Job successfully done!"); } } public static class Map extends Mapper { //IntWritable是Hadoop特定的資料型別, 可以分別想成是java的int private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); //摘取到的字出現一次, 這裡將它紀錄下來 context.write(word, one); } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; //對每一個字, 算出它出現的次數 for(IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } }
2014年7月23日 星期三
MapReduce - wordcount 程式說明 (hadoop 2.2.0 api)
訂閱:
張貼留言 (Atom)
沒有留言:
張貼留言