Hadoop: Wie funktioniert OutputCollector während MapReduce?

Ich möchte wissen, ob die 'Instanz'-Ausgabe des OutputCollector in der Zuordnungsfunktion verwendet wird: output.collect (key, value) dieses Ergebnis speichert die Schlüsselwertpaare irgendwo? Auch wenn es an die Reduktionsfunktion geht, muss es sich um eine Zwischendatei handeln, oder? Was sind das für Dateien? Sind sie sichtbar und werden vom Programmierer festgelegt? Sind die OutputKeyClass und OutputValueClasses, die wir in der Hauptfunktion angeben, diese Speicherorte? [Text.class und IntWritable.class]

Ich gebe den Standardcode für Word Count in MapReduce an, den wir an vielen Stellen im Netz finden können.

public class WordCount {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));    
JobClient.runJob(conf);
}
}

Antworten auf die Frage(3)

Ihre Antwort auf die Frage