Ausgeben einer Liste aus einem Hadoop Map Reduce-Auftrag mit benutzerdefiniertem Schreibschutz

Ich versuche, einen einfachen Kartenverkleinerungsjob zu erstellen, indem ich das von hadoop angegebene Wordcount-Beispiel ändere.

Ich versuche, eine Liste anstelle der Anzahl der Wörter zu erstellen. Das wordcount-Beispiel gibt die folgende Ausgabe an

hello 2
world 2

Ich versuche, es als Liste auszugeben, die die Grundlage für die zukünftige Arbeit bilden wird

hello 1 1
world 1 1

Ich glaube, ich bin auf dem richtigen Weg, aber ich habe Probleme beim Schreiben der Liste. Stattdessen bekomme ich

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

Hier ist mein MyArrayWritable. Ich habe ein Sys raus in diewrite(DataOuptut arg0) aber es gibt nie etwas aus, also denke ich, dass diese Methode möglicherweise nicht aufgerufen wird und ich weiß nicht warum.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

EDIT - Hinzufügen von mehr Quellcode

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

Antworten auf die Frage(1)

Ihre Antwort auf die Frage