public class WordCountNewAPI {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
// 指定Namenode的位置
conf.set("fs.defaultFS", "hdfs://yourhostIP:port");
System.setProperty("HADOOP_USER_NAME", hadoopUser);
Job job = Job.getInstance(conf,"Job name");
job.setJarByClass(WordCountNewAPI.class);
// 指定HDFS上的輸入與輸出目錄
Path input = new Path("/input");
Path output = new Path("/output");
// 可以有多個input path, 可以指定資料夾, 也可以指定到資料夾+檔案名稱
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setMapperClass(Map.class);
/* Combiner可用也可以不用, 不影響結果, 使用它是為了增加工作效率;在這裡, Combiner使用上面的Reduce類別, 它會將每份文件各自先做Reduce後, 再送交給Reducer做overall的Reduce */
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
/*Hadoop定義了多種方法將不同型態的輸入資料轉化為map能夠處理的 pair, 這裡我們使用TextInputFormat(這也是Hadoop預設的輸入方法), 它輸出的 pair型別為 */
/* LongWritable, Text是Hadoop特定的資料型別, 可以分別想成是java的long, string */
/* TextInputFormat以換行作為分割點, it takes the offset to one line as one output key, and takes the text content of that line as the output value corresponding to that key */
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Run job
boolean succ = job.waitForCompletion(true);
if (! succ) {
System.out.println("Job failed, exiting");
System.exit(-1);
}
else{
System.out.println("Job successfully done!");
}
}
public static class Map extends Mapper {
//IntWritable是Hadoop特定的資料型別, 可以分別想成是java的int
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
//摘取到的字出現一次, 這裡將它紀錄下來
context.write(word, one);
}
}
}
public static class Reduce extends Reducer {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int sum = 0;
//對每一個字, 算出它出現的次數
for(IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
}
2014年7月23日 星期三
MapReduce - wordcount 程式說明 (hadoop 2.2.0 api)
訂閱:
張貼留言 (Atom)
沒有留言:
張貼留言