Wysyłanie listy z mapy Hadoop Zmniejsz zadanie za pomocą niestandardowego zapisu

Próbuję utworzyć prostą mapę, zmniejszając zadanie, zmieniając przykład liczby słów podany przez hadoop.

Próbuję wymyślić listę zamiast liczby słów. Przykład ze słowem daje następujący wynik

hello 2
world 2

Próbuję go wyprowadzić jako listę, która będzie stanowić podstawę przyszłych prac

hello 1 1
world 1 1

Myślę, że jestem na dobrej drodze, ale mam problem z pisaniem listy. Zamiast tego otrzymuję

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

Oto mój MyArrayWritable. Włożyłem sys wwrite(DataOuptut arg0) ale nigdy nie wypuszcza niczego, więc myślę, że ta metoda może nie zostać wywołana i nie wiem dlaczego.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

EDYCJA - dodanie więcej kodu źródłowego

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

questionAnswers(1)

yourAnswerToTheQuestion